1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 #![deny(broken_intra_doc_links)]
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::{Score, MultiThreadedLockableScore};
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
28 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 /// writing it to disk/backups by invoking the callback given to it at startup.
40 /// [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 /// at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 /// is provided to [`BackgroundProcessor::start`]).
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60 stop_thread: Arc<AtomicBool>,
61 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 const FRESHNESS_TIMER: u64 = 60;
67 const FRESHNESS_TIMER: u64 = 1;
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
77 const PING_TIMER: u64 = 1;
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
83 const SCORER_PERSIST_TIMER: u64 = 30;
85 const SCORER_PERSIST_TIMER: u64 = 1;
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
94 P: Deref<Target = P2PGossipSync<G, A, L>>,
95 R: Deref<Target = RapidGossipSync<G, L>>,
96 G: Deref<Target = NetworkGraph<L>>,
100 where A::Target: chain::Access, L::Target: Logger {
101 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
103 /// Rapid gossip sync from a trusted server.
110 P: Deref<Target = P2PGossipSync<G, A, L>>,
111 R: Deref<Target = RapidGossipSync<G, L>>,
112 G: Deref<Target = NetworkGraph<L>>,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117 fn network_graph(&self) -> Option<&G> {
119 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121 GossipSync::None => None,
125 fn prunable_network_graph(&self) -> Option<&G> {
127 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128 GossipSync::Rapid(gossip_sync) => {
129 if gossip_sync.is_initial_sync_complete() {
130 Some(gossip_sync.network_graph())
135 GossipSync::None => None,
140 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
141 struct DecoratingEventHandler<
144 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
145 RGS: Deref<Target = RapidGossipSync<G, L>>,
146 G: Deref<Target = NetworkGraph<L>>,
150 where A::Target: chain::Access, L::Target: Logger {
152 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
158 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
159 RGS: Deref<Target = RapidGossipSync<G, L>>,
160 G: Deref<Target = NetworkGraph<L>>,
163 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
164 where A::Target: chain::Access, L::Target: Logger {
165 fn handle_event(&self, event: &Event) {
166 if let Some(network_graph) = self.gossip_sync.network_graph() {
167 network_graph.handle_event(event);
169 self.event_handler.handle_event(event);
173 impl BackgroundProcessor {
174 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
177 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
178 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
179 /// either [`join`] or [`stop`].
181 /// # Data Persistence
183 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
184 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
185 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
186 /// provided implementation.
188 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
189 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
190 /// See the `lightning-persister` crate for LDK's provided implementation.
192 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
193 /// error or call [`join`] and handle any error that may arise. For the latter case,
194 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
198 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
199 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
200 /// functionality implemented by other handlers.
201 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
203 /// # Rapid Gossip Sync
205 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
206 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
207 /// until the [`RapidGossipSync`] instance completes its first sync.
209 /// [top-level documentation]: BackgroundProcessor
210 /// [`join`]: Self::join
211 /// [`stop`]: Self::stop
212 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
213 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
214 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
215 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
216 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
217 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
220 Signer: 'static + Sign,
221 CA: 'static + Deref + Send + Sync,
222 CF: 'static + Deref + Send + Sync,
223 CW: 'static + Deref + Send + Sync,
224 T: 'static + Deref + Send + Sync,
225 K: 'static + Deref + Send + Sync,
226 F: 'static + Deref + Send + Sync,
227 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
228 L: 'static + Deref + Send + Sync,
229 P: 'static + Deref + Send + Sync,
230 Descriptor: 'static + SocketDescriptor + Send + Sync,
231 CMH: 'static + Deref + Send + Sync,
232 RMH: 'static + Deref + Send + Sync,
233 EH: 'static + EventHandler + Send,
234 PS: 'static + Deref + Send,
235 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
236 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
237 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
238 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
239 UMH: 'static + Deref + Send + Sync,
240 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
243 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
244 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<&'static MultiThreadedLockableScore<SC>>,
247 CA::Target: 'static + chain::Access,
248 CF::Target: 'static + chain::Filter,
249 CW::Target: 'static + chain::Watch<Signer>,
250 T::Target: 'static + BroadcasterInterface,
251 K::Target: 'static + KeysInterface<Signer = Signer>,
252 F::Target: 'static + FeeEstimator,
253 L::Target: 'static + Logger,
254 P::Target: 'static + Persist<Signer>,
255 CMH::Target: 'static + ChannelMessageHandler,
256 RMH::Target: 'static + RoutingMessageHandler,
257 UMH::Target: 'static + CustomMessageHandler,
258 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
260 let stop_thread = Arc::new(AtomicBool::new(false));
261 let stop_thread_clone = stop_thread.clone();
262 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
263 let event_handler = DecoratingEventHandler {
265 gossip_sync: &gossip_sync,
268 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
269 channel_manager.timer_tick_occurred();
271 let mut last_freshness_call = Instant::now();
272 let mut last_ping_call = Instant::now();
273 let mut last_prune_call = Instant::now();
274 let mut last_scorer_persist_call = Instant::now();
275 let mut have_pruned = false;
278 channel_manager.process_pending_events(&event_handler);
279 chain_monitor.process_pending_events(&event_handler);
281 // Note that the PeerManager::process_events may block on ChannelManager's locks,
282 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
283 // we want to ensure we get into `persist_manager` as quickly as we can, especially
284 // without running the normal event processing above and handing events to users.
286 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
287 // processing a message effectively at any point during this loop. In order to
288 // minimize the time between such processing completing and persisting the updated
289 // ChannelManager, we want to minimize methods blocking on a ChannelManager
290 // generally, and as a fallback place such blocking only immediately before
292 peer_manager.process_events();
294 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
295 // see `await_start`'s use below.
296 let await_start = Instant::now();
297 let updates_available =
298 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
299 let await_time = await_start.elapsed();
301 if updates_available {
302 log_trace!(logger, "Persisting ChannelManager...");
303 persister.persist_manager(&*channel_manager)?;
304 log_trace!(logger, "Done persisting ChannelManager.");
306 // Exit the loop if the background processor was requested to stop.
307 if stop_thread.load(Ordering::Acquire) == true {
308 log_trace!(logger, "Terminating background processor.");
311 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
312 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
313 channel_manager.timer_tick_occurred();
314 last_freshness_call = Instant::now();
316 if await_time > Duration::from_secs(1) {
317 // On various platforms, we may be starved of CPU cycles for several reasons.
318 // E.g. on iOS, if we've been in the background, we will be entirely paused.
319 // Similarly, if we're on a desktop platform and the device has been asleep, we
320 // may not get any cycles.
321 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
322 // full second, at which point we assume sockets may have been killed (they
323 // appear to be at least on some platforms, even if it has only been a second).
324 // Note that we have to take care to not get here just because user event
325 // processing was slow at the top of the loop. For example, the sample client
326 // may call Bitcoin Core RPCs during event handling, which very often takes
327 // more than a handful of seconds to complete, and shouldn't disconnect all our
329 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
330 peer_manager.disconnect_all_peers();
331 last_ping_call = Instant::now();
332 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
333 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
334 peer_manager.timer_tick_occurred();
335 last_ping_call = Instant::now();
338 // Note that we want to run a graph prune once not long after startup before
339 // falling back to our usual hourly prunes. This avoids short-lived clients never
340 // pruning their network graph. We run once 60 seconds after startup before
341 // continuing our normal cadence.
342 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
343 // The network graph must not be pruned while rapid sync completion is pending
344 log_trace!(logger, "Assessing prunability of network graph");
345 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
346 network_graph.remove_stale_channels();
348 if let Err(e) = persister.persist_graph(network_graph) {
349 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
352 last_prune_call = Instant::now();
355 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
359 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
360 if let Some(ref scorer) = scorer {
361 log_trace!(logger, "Persisting scorer");
362 if let Err(e) = persister.persist_scorer(&scorer) {
363 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
366 last_scorer_persist_call = Instant::now();
370 // After we exit, ensure we persist the ChannelManager one final time - this avoids
371 // some races where users quit while channel updates were in-flight, with
372 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
373 persister.persist_manager(&*channel_manager)?;
375 // Persist Scorer on exit
376 if let Some(ref scorer) = scorer {
377 persister.persist_scorer(&scorer)?;
380 // Persist NetworkGraph on exit
381 if let Some(network_graph) = gossip_sync.network_graph() {
382 persister.persist_graph(network_graph)?;
387 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
390 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
391 /// [`ChannelManager`].
395 /// This function panics if the background thread has panicked such as while persisting or
398 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
399 pub fn join(mut self) -> Result<(), std::io::Error> {
400 assert!(self.thread_handle.is_some());
404 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
405 /// [`ChannelManager`].
409 /// This function panics if the background thread has panicked such as while persisting or
412 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
413 pub fn stop(mut self) -> Result<(), std::io::Error> {
414 assert!(self.thread_handle.is_some());
415 self.stop_and_join_thread()
418 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
419 self.stop_thread.store(true, Ordering::Release);
423 fn join_thread(&mut self) -> Result<(), std::io::Error> {
424 match self.thread_handle.take() {
425 Some(handle) => handle.join().unwrap(),
431 impl Drop for BackgroundProcessor {
433 self.stop_and_join_thread().unwrap();
439 use bitcoin::blockdata::block::BlockHeader;
440 use bitcoin::blockdata::constants::genesis_block;
441 use bitcoin::blockdata::transaction::{Transaction, TxOut};
442 use bitcoin::network::constants::Network;
443 use lightning::chain::{BestBlock, Confirm, chainmonitor};
444 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
445 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
446 use lightning::chain::transaction::OutPoint;
447 use lightning::get_event_msg;
448 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
449 use lightning::ln::features::{ChannelFeatures, InitFeatures};
450 use lightning::ln::msgs::{ChannelMessageHandler, Init};
451 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
452 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
453 use lightning::util::config::UserConfig;
454 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
455 use lightning::util::ser::Writeable;
456 use lightning::util::test_utils;
457 use lightning::util::persist::KVStorePersister;
458 use lightning_invoice::payment::{InvoicePayer, Retry};
459 use lightning_invoice::utils::DefaultRouter;
460 use lightning_persister::FilesystemPersister;
462 use std::path::PathBuf;
463 use std::sync::{Arc, Mutex};
464 use std::sync::mpsc::SyncSender;
465 use std::time::Duration;
466 use lightning::routing::scoring::{FixedPenaltyScorer};
467 use lightning_rapid_gossip_sync::RapidGossipSync;
468 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
470 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
472 #[derive(Clone, Eq, Hash, PartialEq)]
473 struct TestDescriptor{}
474 impl SocketDescriptor for TestDescriptor {
475 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
479 fn disconnect_socket(&mut self) {}
482 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
484 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
485 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
488 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
489 p2p_gossip_sync: PGS,
490 rapid_gossip_sync: RGS,
491 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
492 chain_monitor: Arc<ChainMonitor>,
493 persister: Arc<FilesystemPersister>,
494 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
495 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
496 logger: Arc<test_utils::TestLogger>,
497 best_block: BestBlock,
498 scorer: Arc<Mutex<FixedPenaltyScorer>>,
502 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
503 GossipSync::P2P(self.p2p_gossip_sync.clone())
506 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
507 GossipSync::Rapid(self.rapid_gossip_sync.clone())
510 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
517 let data_dir = self.persister.get_data_dir();
518 match fs::remove_dir_all(data_dir.clone()) {
519 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
526 graph_error: Option<(std::io::ErrorKind, &'static str)>,
527 graph_persistence_notifier: Option<SyncSender<()>>,
528 manager_error: Option<(std::io::ErrorKind, &'static str)>,
529 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
530 filesystem_persister: FilesystemPersister,
534 fn new(data_dir: String) -> Self {
535 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
536 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
539 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
540 Self { graph_error: Some((error, message)), ..self }
543 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
544 Self { graph_persistence_notifier: Some(sender), ..self }
547 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
548 Self { manager_error: Some((error, message)), ..self }
551 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
552 Self { scorer_error: Some((error, message)), ..self }
556 impl KVStorePersister for Persister {
557 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
558 if key == "manager" {
559 if let Some((error, message)) = self.manager_error {
560 return Err(std::io::Error::new(error, message))
564 if key == "network_graph" {
565 if let Some(sender) = &self.graph_persistence_notifier {
566 sender.send(()).unwrap();
569 if let Some((error, message)) = self.graph_error {
570 return Err(std::io::Error::new(error, message))
575 if let Some((error, message)) = self.scorer_error {
576 return Err(std::io::Error::new(error, message))
580 self.filesystem_persister.persist(key, object)
584 fn get_full_filepath(filepath: String, filename: String) -> String {
585 let mut path = PathBuf::from(filepath);
587 path.to_str().unwrap().to_string()
590 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
591 let mut nodes = Vec::new();
592 for i in 0..num_nodes {
593 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
594 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
595 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
596 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
597 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
598 let seed = [i as u8; 32];
599 let network = Network::Testnet;
600 let genesis_block = genesis_block(network);
601 let now = Duration::from_secs(genesis_block.header.time as u64);
602 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
603 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
604 let best_block = BestBlock::from_genesis(network);
605 let params = ChainParameters { network, best_block };
606 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
607 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
608 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
609 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
610 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
611 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
612 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
613 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
617 for i in 0..num_nodes {
618 for j in (i+1)..num_nodes {
619 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
620 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
627 macro_rules! open_channel {
628 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
629 begin_open_channel!($node_a, $node_b, $channel_value);
630 let events = $node_a.node.get_and_clear_pending_events();
631 assert_eq!(events.len(), 1);
632 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
633 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
638 macro_rules! begin_open_channel {
639 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
640 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
641 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
642 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
646 macro_rules! handle_funding_generation_ready {
647 ($event: expr, $channel_value: expr) => {{
649 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
650 assert_eq!(channel_value_satoshis, $channel_value);
651 assert_eq!(user_channel_id, 42);
653 let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
654 value: channel_value_satoshis, script_pubkey: output_script.clone(),
656 (temporary_channel_id, tx)
658 _ => panic!("Unexpected event"),
663 macro_rules! end_open_channel {
664 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
665 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
666 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
667 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
671 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
673 let prev_blockhash = node.best_block.block_hash();
674 let height = node.best_block.height() + 1;
675 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
676 let txdata = vec![(0, tx)];
677 node.best_block = BestBlock::new(header.block_hash(), height);
680 node.node.transactions_confirmed(&header, &txdata, height);
681 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
684 node.node.best_block_updated(&header, height);
685 node.chain_monitor.best_block_updated(&header, height);
691 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
692 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
696 fn test_background_processor() {
697 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
698 // updates. Also test that when new updates are available, the manager signals that it needs
699 // re-persistence and is successfully re-persisted.
700 let nodes = create_nodes(2, "test_background_processor".to_string());
702 // Go through the channel creation process so that each node has something to persist. Since
703 // open_channel consumes events, it must complete before starting BackgroundProcessor to
704 // avoid a race with processing events.
705 let tx = open_channel!(nodes[0], nodes[1], 100000);
707 // Initiate the background processors to watch each node.
708 let data_dir = nodes[0].persister.get_data_dir();
709 let persister = Arc::new(Persister::new(data_dir));
710 let event_handler = |_: &_| {};
711 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
713 macro_rules! check_persisted_data {
714 ($node: expr, $filepath: expr) => {
715 let mut expected_bytes = Vec::new();
717 expected_bytes.clear();
718 match $node.write(&mut expected_bytes) {
720 match std::fs::read($filepath) {
722 if bytes == expected_bytes {
731 Err(e) => panic!("Unexpected error: {}", e)
737 // Check that the initial channel manager data is persisted as expected.
738 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
739 check_persisted_data!(nodes[0].node, filepath.clone());
742 if !nodes[0].node.get_persistence_condvar_value() { break }
745 // Force-close the channel.
746 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
748 // Check that the force-close updates are persisted.
749 check_persisted_data!(nodes[0].node, filepath.clone());
751 if !nodes[0].node.get_persistence_condvar_value() { break }
754 // Check network graph is persisted
755 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
756 check_persisted_data!(nodes[0].network_graph, filepath.clone());
758 // Check scorer is persisted
759 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
760 check_persisted_data!(nodes[0].scorer, filepath.clone());
762 assert!(bg_processor.stop().is_ok());
766 fn test_timer_tick_called() {
767 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
768 // `FRESHNESS_TIMER`.
769 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
770 let data_dir = nodes[0].persister.get_data_dir();
771 let persister = Arc::new(Persister::new(data_dir));
772 let event_handler = |_: &_| {};
773 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
775 let log_entries = nodes[0].logger.lines.lock().unwrap();
776 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
777 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
778 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
779 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
784 assert!(bg_processor.stop().is_ok());
788 fn test_channel_manager_persist_error() {
789 // Test that if we encounter an error during manager persistence, the thread panics.
790 let nodes = create_nodes(2, "test_persist_error".to_string());
791 open_channel!(nodes[0], nodes[1], 100000);
793 let data_dir = nodes[0].persister.get_data_dir();
794 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
795 let event_handler = |_: &_| {};
796 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
797 match bg_processor.join() {
798 Ok(_) => panic!("Expected error persisting manager"),
800 assert_eq!(e.kind(), std::io::ErrorKind::Other);
801 assert_eq!(e.get_ref().unwrap().to_string(), "test");
807 fn test_network_graph_persist_error() {
808 // Test that if we encounter an error during network graph persistence, an error gets returned.
809 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
810 let data_dir = nodes[0].persister.get_data_dir();
811 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
812 let event_handler = |_: &_| {};
813 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
815 match bg_processor.stop() {
816 Ok(_) => panic!("Expected error persisting network graph"),
818 assert_eq!(e.kind(), std::io::ErrorKind::Other);
819 assert_eq!(e.get_ref().unwrap().to_string(), "test");
825 fn test_scorer_persist_error() {
826 // Test that if we encounter an error during scorer persistence, an error gets returned.
827 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
828 let data_dir = nodes[0].persister.get_data_dir();
829 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
830 let event_handler = |_: &_| {};
831 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
833 match bg_processor.stop() {
834 Ok(_) => panic!("Expected error persisting scorer"),
836 assert_eq!(e.kind(), std::io::ErrorKind::Other);
837 assert_eq!(e.get_ref().unwrap().to_string(), "test");
843 fn test_background_event_handling() {
844 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
845 let channel_value = 100000;
846 let data_dir = nodes[0].persister.get_data_dir();
847 let persister = Arc::new(Persister::new(data_dir.clone()));
849 // Set up a background event handler for FundingGenerationReady events.
850 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
851 let event_handler = move |event: &Event| {
852 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
854 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
856 // Open a channel and check that the FundingGenerationReady event was handled.
857 begin_open_channel!(nodes[0], nodes[1], channel_value);
858 let (temporary_channel_id, funding_tx) = receiver
859 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
860 .expect("FundingGenerationReady not handled within deadline");
861 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
863 // Confirm the funding transaction.
864 confirm_transaction(&mut nodes[0], &funding_tx);
865 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
866 confirm_transaction(&mut nodes[1], &funding_tx);
867 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
868 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
869 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
870 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
871 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
873 assert!(bg_processor.stop().is_ok());
875 // Set up a background event handler for SpendableOutputs events.
876 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
877 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
878 let persister = Arc::new(Persister::new(data_dir));
879 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
881 // Force close the channel and check that the SpendableOutputs event was handled.
882 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
883 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
884 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
886 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
887 .expect("SpendableOutputs not handled within deadline");
889 Event::SpendableOutputs { .. } => {},
890 Event::ChannelClosed { .. } => {},
891 _ => panic!("Unexpected event: {:?}", event),
894 assert!(bg_processor.stop().is_ok());
898 fn test_scorer_persistence() {
899 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
900 let data_dir = nodes[0].persister.get_data_dir();
901 let persister = Arc::new(Persister::new(data_dir));
902 let event_handler = |_: &_| {};
903 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
906 let log_entries = nodes[0].logger.lines.lock().unwrap();
907 let expected_log = "Persisting scorer".to_string();
908 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
913 assert!(bg_processor.stop().is_ok());
917 fn test_not_pruning_network_graph_until_graph_sync_completion() {
918 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
919 let data_dir = nodes[0].persister.get_data_dir();
920 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
921 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
922 let network_graph = nodes[0].network_graph.clone();
923 let features = ChannelFeatures::empty();
924 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
925 .expect("Failed to update channel from partial announcement");
926 let original_graph_description = network_graph.to_string();
927 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
928 assert_eq!(network_graph.read_only().channels().len(), 1);
930 let event_handler = |_: &_| {};
931 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
934 let log_entries = nodes[0].logger.lines.lock().unwrap();
935 let expected_log_a = "Assessing prunability of network graph".to_string();
936 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
937 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
938 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
943 let initialization_input = vec![
944 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
945 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
946 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
947 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
948 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
949 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
950 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
951 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
952 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
953 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
954 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
955 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
956 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
958 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
960 // this should have added two channels
961 assert_eq!(network_graph.read_only().channels().len(), 3);
964 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
965 .expect("Network graph not pruned within deadline");
967 background_processor.stop().unwrap();
969 // all channels should now be pruned
970 assert_eq!(network_graph.read_only().channels().len(), 0);
974 fn test_invoice_payer() {
975 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
976 let random_seed_bytes = keys_manager.get_secure_random_bytes();
977 let nodes = create_nodes(2, "test_invoice_payer".to_string());
979 // Initiate the background processors to watch each node.
980 let data_dir = nodes[0].persister.get_data_dir();
981 let persister = Arc::new(Persister::new(data_dir));
982 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
983 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
984 let event_handler = Arc::clone(&invoice_payer);
985 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
986 assert!(bg_processor.stop().is_ok());