1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 #![deny(broken_intra_doc_links)]
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
28 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 /// writing it to disk/backups by invoking the callback given to it at startup.
40 /// [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 /// at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`P2PGossipSync`] is provided to
44 /// [`BackgroundProcessor::start`]).
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60 stop_thread: Arc<AtomicBool>,
61 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 const FRESHNESS_TIMER: u64 = 60;
67 const FRESHNESS_TIMER: u64 = 1;
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
77 const PING_TIMER: u64 = 1;
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
82 #[cfg(all(not(test), debug_assertions))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
85 const SCORER_PERSIST_TIMER: u64 = 1;
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
93 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
94 struct DecoratingEventHandler<
96 P: Deref<Target = P2PGossipSync<G, A, L>>,
97 G: Deref<Target = NetworkGraph<L>>,
101 where A::Target: chain::Access, L::Target: Logger {
103 p2p_gossip_sync: Option<P>,
108 P: Deref<Target = P2PGossipSync<G, A, L>>,
109 G: Deref<Target = NetworkGraph<L>>,
112 > EventHandler for DecoratingEventHandler<E, P, G, A, L>
113 where A::Target: chain::Access, L::Target: Logger {
114 fn handle_event(&self, event: &Event) {
115 if let Some(event_handler) = &self.p2p_gossip_sync {
116 event_handler.handle_event(event);
118 self.event_handler.handle_event(event);
122 impl BackgroundProcessor {
123 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
126 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
127 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
128 /// either [`join`] or [`stop`].
130 /// # Data Persistence
132 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
133 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
134 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
135 /// provided implementation.
137 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
138 /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate
139 /// for LDK's provided implementation.
141 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
142 /// error or call [`join`] and handle any error that may arise. For the latter case,
143 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
147 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
148 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
149 /// functionality implemented by other handlers.
150 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
152 /// # Rapid Gossip Sync
154 /// If rapid gossip sync is meant to run at startup, pass an optional [`RapidGossipSync`]
155 /// to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`] not to prune the
156 /// [`NetworkGraph`] instance until the [`RapidGossipSync`] instance completes its first sync.
158 /// [top-level documentation]: BackgroundProcessor
159 /// [`join`]: Self::join
160 /// [`stop`]: Self::stop
161 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
162 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
163 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
164 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
165 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
166 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
169 Signer: 'static + Sign,
170 CA: 'static + Deref + Send + Sync,
171 CF: 'static + Deref + Send + Sync,
172 CW: 'static + Deref + Send + Sync,
173 T: 'static + Deref + Send + Sync,
174 K: 'static + Deref + Send + Sync,
175 F: 'static + Deref + Send + Sync,
176 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
177 L: 'static + Deref + Send + Sync,
178 P: 'static + Deref + Send + Sync,
179 Descriptor: 'static + SocketDescriptor + Send + Sync,
180 CMH: 'static + Deref + Send + Sync,
181 RMH: 'static + Deref + Send + Sync,
182 EH: 'static + EventHandler + Send,
183 PS: 'static + Deref + Send,
184 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
185 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
186 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
187 UMH: 'static + Deref + Send + Sync,
188 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
189 S: 'static + Deref<Target = SC> + Send + Sync,
190 SC: WriteableScore<'a>,
191 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send
193 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
194 p2p_gossip_sync: Option<PGS>, peer_manager: PM, logger: L, scorer: Option<S>,
195 rapid_gossip_sync: Option<RGS>
198 CA::Target: 'static + chain::Access,
199 CF::Target: 'static + chain::Filter,
200 CW::Target: 'static + chain::Watch<Signer>,
201 T::Target: 'static + BroadcasterInterface,
202 K::Target: 'static + KeysInterface<Signer = Signer>,
203 F::Target: 'static + FeeEstimator,
204 L::Target: 'static + Logger,
205 P::Target: 'static + Persist<Signer>,
206 CMH::Target: 'static + ChannelMessageHandler,
207 RMH::Target: 'static + RoutingMessageHandler,
208 UMH::Target: 'static + CustomMessageHandler,
209 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
211 let stop_thread = Arc::new(AtomicBool::new(false));
212 let stop_thread_clone = stop_thread.clone();
213 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
214 let event_handler = DecoratingEventHandler { event_handler, p2p_gossip_sync: p2p_gossip_sync.as_ref().map(|t| t.deref()) };
216 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
217 channel_manager.timer_tick_occurred();
219 let mut last_freshness_call = Instant::now();
220 let mut last_ping_call = Instant::now();
221 let mut last_prune_call = Instant::now();
222 let mut last_scorer_persist_call = Instant::now();
223 let mut have_pruned = false;
226 channel_manager.process_pending_events(&event_handler);
227 chain_monitor.process_pending_events(&event_handler);
229 // Note that the PeerManager::process_events may block on ChannelManager's locks,
230 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
231 // we want to ensure we get into `persist_manager` as quickly as we can, especially
232 // without running the normal event processing above and handing events to users.
234 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
235 // processing a message effectively at any point during this loop. In order to
236 // minimize the time between such processing completing and persisting the updated
237 // ChannelManager, we want to minimize methods blocking on a ChannelManager
238 // generally, and as a fallback place such blocking only immediately before
240 peer_manager.process_events();
242 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
243 // see `await_start`'s use below.
244 let await_start = Instant::now();
245 let updates_available =
246 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
247 let await_time = await_start.elapsed();
249 if updates_available {
250 log_trace!(logger, "Persisting ChannelManager...");
251 persister.persist_manager(&*channel_manager)?;
252 log_trace!(logger, "Done persisting ChannelManager.");
254 // Exit the loop if the background processor was requested to stop.
255 if stop_thread.load(Ordering::Acquire) == true {
256 log_trace!(logger, "Terminating background processor.");
259 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
260 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
261 channel_manager.timer_tick_occurred();
262 last_freshness_call = Instant::now();
264 if await_time > Duration::from_secs(1) {
265 // On various platforms, we may be starved of CPU cycles for several reasons.
266 // E.g. on iOS, if we've been in the background, we will be entirely paused.
267 // Similarly, if we're on a desktop platform and the device has been asleep, we
268 // may not get any cycles.
269 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
270 // full second, at which point we assume sockets may have been killed (they
271 // appear to be at least on some platforms, even if it has only been a second).
272 // Note that we have to take care to not get here just because user event
273 // processing was slow at the top of the loop. For example, the sample client
274 // may call Bitcoin Core RPCs during event handling, which very often takes
275 // more than a handful of seconds to complete, and shouldn't disconnect all our
277 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
278 peer_manager.disconnect_all_peers();
279 last_ping_call = Instant::now();
280 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
281 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
282 peer_manager.timer_tick_occurred();
283 last_ping_call = Instant::now();
286 // Note that we want to run a graph prune once not long after startup before
287 // falling back to our usual hourly prunes. This avoids short-lived clients never
288 // pruning their network graph. We run once 60 seconds after startup before
289 // continuing our normal cadence.
290 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
291 // The network graph must not be pruned while rapid sync completion is pending
292 log_trace!(logger, "Assessing prunability of network graph");
293 let graph_to_prune = match rapid_gossip_sync.as_ref() {
294 Some(rapid_sync) => {
295 if rapid_sync.is_initial_sync_complete() {
296 Some(rapid_sync.network_graph())
301 None => p2p_gossip_sync.as_ref().map(|sync| sync.network_graph())
304 if let Some(network_graph_reference) = graph_to_prune {
305 network_graph_reference.remove_stale_channels();
307 if let Err(e) = persister.persist_graph(network_graph_reference) {
308 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
311 last_prune_call = Instant::now();
314 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
318 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
319 if let Some(ref scorer) = scorer {
320 log_trace!(logger, "Persisting scorer");
321 if let Err(e) = persister.persist_scorer(&scorer) {
322 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
325 last_scorer_persist_call = Instant::now();
329 // After we exit, ensure we persist the ChannelManager one final time - this avoids
330 // some races where users quit while channel updates were in-flight, with
331 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
332 persister.persist_manager(&*channel_manager)?;
334 // Persist Scorer on exit
335 if let Some(ref scorer) = scorer {
336 persister.persist_scorer(&scorer)?;
339 // Persist NetworkGraph on exit
340 if let Some(ref gossip_sync) = p2p_gossip_sync {
341 persister.persist_graph(gossip_sync.network_graph())?;
346 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
349 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
350 /// [`ChannelManager`].
354 /// This function panics if the background thread has panicked such as while persisting or
357 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
358 pub fn join(mut self) -> Result<(), std::io::Error> {
359 assert!(self.thread_handle.is_some());
363 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
364 /// [`ChannelManager`].
368 /// This function panics if the background thread has panicked such as while persisting or
371 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
372 pub fn stop(mut self) -> Result<(), std::io::Error> {
373 assert!(self.thread_handle.is_some());
374 self.stop_and_join_thread()
377 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
378 self.stop_thread.store(true, Ordering::Release);
382 fn join_thread(&mut self) -> Result<(), std::io::Error> {
383 match self.thread_handle.take() {
384 Some(handle) => handle.join().unwrap(),
390 impl Drop for BackgroundProcessor {
392 self.stop_and_join_thread().unwrap();
398 use bitcoin::blockdata::block::BlockHeader;
399 use bitcoin::blockdata::constants::genesis_block;
400 use bitcoin::blockdata::transaction::{Transaction, TxOut};
401 use bitcoin::network::constants::Network;
402 use lightning::chain::{BestBlock, Confirm, chainmonitor};
403 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
404 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
405 use lightning::chain::transaction::OutPoint;
406 use lightning::get_event_msg;
407 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
408 use lightning::ln::features::{ChannelFeatures, InitFeatures};
409 use lightning::ln::msgs::{ChannelMessageHandler, Init};
410 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
411 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
412 use lightning::util::config::UserConfig;
413 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
414 use lightning::util::ser::Writeable;
415 use lightning::util::test_utils;
416 use lightning::util::persist::KVStorePersister;
417 use lightning_invoice::payment::{InvoicePayer, Retry};
418 use lightning_invoice::utils::DefaultRouter;
419 use lightning_persister::FilesystemPersister;
421 use std::path::PathBuf;
422 use std::sync::{Arc, Mutex};
423 use std::sync::mpsc::SyncSender;
424 use std::time::Duration;
425 use lightning::routing::scoring::{FixedPenaltyScorer};
426 use lightning_rapid_gossip_sync::RapidGossipSync;
427 use super::{BackgroundProcessor, FRESHNESS_TIMER};
429 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
431 #[derive(Clone, Eq, Hash, PartialEq)]
432 struct TestDescriptor{}
433 impl SocketDescriptor for TestDescriptor {
434 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
438 fn disconnect_socket(&mut self) {}
441 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
444 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
445 p2p_gossip_sync: Option<Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>>,
446 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
447 chain_monitor: Arc<ChainMonitor>,
448 persister: Arc<FilesystemPersister>,
449 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
450 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
451 logger: Arc<test_utils::TestLogger>,
452 best_block: BestBlock,
453 scorer: Arc<Mutex<FixedPenaltyScorer>>,
454 rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
459 let data_dir = self.persister.get_data_dir();
460 match fs::remove_dir_all(data_dir.clone()) {
461 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
468 graph_error: Option<(std::io::ErrorKind, &'static str)>,
469 graph_persistence_notifier: Option<SyncSender<()>>,
470 manager_error: Option<(std::io::ErrorKind, &'static str)>,
471 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
472 filesystem_persister: FilesystemPersister,
476 fn new(data_dir: String) -> Self {
477 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
478 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
481 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
482 Self { graph_error: Some((error, message)), ..self }
485 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
486 Self { graph_persistence_notifier: Some(sender), ..self }
489 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
490 Self { manager_error: Some((error, message)), ..self }
493 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
494 Self { scorer_error: Some((error, message)), ..self }
498 impl KVStorePersister for Persister {
499 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
500 if key == "manager" {
501 if let Some((error, message)) = self.manager_error {
502 return Err(std::io::Error::new(error, message))
506 if key == "network_graph" {
507 if let Some(sender) = &self.graph_persistence_notifier {
508 sender.send(()).unwrap();
511 if let Some((error, message)) = self.graph_error {
512 return Err(std::io::Error::new(error, message))
517 if let Some((error, message)) = self.scorer_error {
518 return Err(std::io::Error::new(error, message))
522 self.filesystem_persister.persist(key, object)
526 fn get_full_filepath(filepath: String, filename: String) -> String {
527 let mut path = PathBuf::from(filepath);
529 path.to_str().unwrap().to_string()
532 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
533 let mut nodes = Vec::new();
534 for i in 0..num_nodes {
535 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
536 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
537 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
538 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
539 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
540 let seed = [i as u8; 32];
541 let network = Network::Testnet;
542 let genesis_block = genesis_block(network);
543 let now = Duration::from_secs(genesis_block.header.time as u64);
544 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
545 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
546 let best_block = BestBlock::from_genesis(network);
547 let params = ChainParameters { network, best_block };
548 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
549 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
550 let p2p_gossip_sync = Some(Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
551 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
552 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
553 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
554 let rapid_gossip_sync = None;
555 let node = Node { node: manager, p2p_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer, rapid_gossip_sync };
559 for i in 0..num_nodes {
560 for j in (i+1)..num_nodes {
561 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
562 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
569 macro_rules! open_channel {
570 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
571 begin_open_channel!($node_a, $node_b, $channel_value);
572 let events = $node_a.node.get_and_clear_pending_events();
573 assert_eq!(events.len(), 1);
574 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
575 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
580 macro_rules! begin_open_channel {
581 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
582 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
583 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
584 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
588 macro_rules! handle_funding_generation_ready {
589 ($event: expr, $channel_value: expr) => {{
591 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
592 assert_eq!(channel_value_satoshis, $channel_value);
593 assert_eq!(user_channel_id, 42);
595 let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
596 value: channel_value_satoshis, script_pubkey: output_script.clone(),
598 (temporary_channel_id, tx)
600 _ => panic!("Unexpected event"),
605 macro_rules! end_open_channel {
606 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
607 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
608 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
609 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
613 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
615 let prev_blockhash = node.best_block.block_hash();
616 let height = node.best_block.height() + 1;
617 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
618 let txdata = vec![(0, tx)];
619 node.best_block = BestBlock::new(header.block_hash(), height);
622 node.node.transactions_confirmed(&header, &txdata, height);
623 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
626 node.node.best_block_updated(&header, height);
627 node.chain_monitor.best_block_updated(&header, height);
633 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
634 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
638 fn test_background_processor() {
639 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
640 // updates. Also test that when new updates are available, the manager signals that it needs
641 // re-persistence and is successfully re-persisted.
642 let nodes = create_nodes(2, "test_background_processor".to_string());
644 // Go through the channel creation process so that each node has something to persist. Since
645 // open_channel consumes events, it must complete before starting BackgroundProcessor to
646 // avoid a race with processing events.
647 let tx = open_channel!(nodes[0], nodes[1], 100000);
649 // Initiate the background processors to watch each node.
650 let data_dir = nodes[0].persister.get_data_dir();
651 let persister = Arc::new(Persister::new(data_dir));
652 let event_handler = |_: &_| {};
653 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
655 macro_rules! check_persisted_data {
656 ($node: expr, $filepath: expr) => {
657 let mut expected_bytes = Vec::new();
659 expected_bytes.clear();
660 match $node.write(&mut expected_bytes) {
662 match std::fs::read($filepath) {
664 if bytes == expected_bytes {
673 Err(e) => panic!("Unexpected error: {}", e)
679 // Check that the initial channel manager data is persisted as expected.
680 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
681 check_persisted_data!(nodes[0].node, filepath.clone());
684 if !nodes[0].node.get_persistence_condvar_value() { break }
687 // Force-close the channel.
688 nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
690 // Check that the force-close updates are persisted.
691 check_persisted_data!(nodes[0].node, filepath.clone());
693 if !nodes[0].node.get_persistence_condvar_value() { break }
696 // Check network graph is persisted
697 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
698 if let Some(ref handler) = nodes[0].p2p_gossip_sync {
699 let network_graph = handler.network_graph();
700 check_persisted_data!(network_graph, filepath.clone());
703 // Check scorer is persisted
704 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
705 check_persisted_data!(nodes[0].scorer, filepath.clone());
707 assert!(bg_processor.stop().is_ok());
711 fn test_timer_tick_called() {
712 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
713 // `FRESHNESS_TIMER`.
714 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
715 let data_dir = nodes[0].persister.get_data_dir();
716 let persister = Arc::new(Persister::new(data_dir));
717 let event_handler = |_: &_| {};
718 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
720 let log_entries = nodes[0].logger.lines.lock().unwrap();
721 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
722 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
723 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
724 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
729 assert!(bg_processor.stop().is_ok());
733 fn test_channel_manager_persist_error() {
734 // Test that if we encounter an error during manager persistence, the thread panics.
735 let nodes = create_nodes(2, "test_persist_error".to_string());
736 open_channel!(nodes[0], nodes[1], 100000);
738 let data_dir = nodes[0].persister.get_data_dir();
739 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
740 let event_handler = |_: &_| {};
741 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
742 match bg_processor.join() {
743 Ok(_) => panic!("Expected error persisting manager"),
745 assert_eq!(e.kind(), std::io::ErrorKind::Other);
746 assert_eq!(e.get_ref().unwrap().to_string(), "test");
752 fn test_network_graph_persist_error() {
753 // Test that if we encounter an error during network graph persistence, an error gets returned.
754 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
755 let data_dir = nodes[0].persister.get_data_dir();
756 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
757 let event_handler = |_: &_| {};
758 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
760 match bg_processor.stop() {
761 Ok(_) => panic!("Expected error persisting network graph"),
763 assert_eq!(e.kind(), std::io::ErrorKind::Other);
764 assert_eq!(e.get_ref().unwrap().to_string(), "test");
770 fn test_scorer_persist_error() {
771 // Test that if we encounter an error during scorer persistence, an error gets returned.
772 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
773 let data_dir = nodes[0].persister.get_data_dir();
774 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
775 let event_handler = |_: &_| {};
776 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
778 match bg_processor.stop() {
779 Ok(_) => panic!("Expected error persisting scorer"),
781 assert_eq!(e.kind(), std::io::ErrorKind::Other);
782 assert_eq!(e.get_ref().unwrap().to_string(), "test");
788 fn test_background_event_handling() {
789 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
790 let channel_value = 100000;
791 let data_dir = nodes[0].persister.get_data_dir();
792 let persister = Arc::new(Persister::new(data_dir.clone()));
794 // Set up a background event handler for FundingGenerationReady events.
795 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
796 let event_handler = move |event: &Event| {
797 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
799 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
801 // Open a channel and check that the FundingGenerationReady event was handled.
802 begin_open_channel!(nodes[0], nodes[1], channel_value);
803 let (temporary_channel_id, funding_tx) = receiver
804 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
805 .expect("FundingGenerationReady not handled within deadline");
806 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
808 // Confirm the funding transaction.
809 confirm_transaction(&mut nodes[0], &funding_tx);
810 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
811 confirm_transaction(&mut nodes[1], &funding_tx);
812 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
813 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
814 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
815 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
816 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
818 assert!(bg_processor.stop().is_ok());
820 // Set up a background event handler for SpendableOutputs events.
821 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
822 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
823 let persister = Arc::new(Persister::new(data_dir));
824 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
826 // Force close the channel and check that the SpendableOutputs event was handled.
827 nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
828 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
829 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
831 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
832 .expect("SpendableOutputs not handled within deadline");
834 Event::SpendableOutputs { .. } => {},
835 Event::ChannelClosed { .. } => {},
836 _ => panic!("Unexpected event: {:?}", event),
839 assert!(bg_processor.stop().is_ok());
843 fn test_scorer_persistence() {
844 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
845 let data_dir = nodes[0].persister.get_data_dir();
846 let persister = Arc::new(Persister::new(data_dir));
847 let event_handler = |_: &_| {};
848 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
851 let log_entries = nodes[0].logger.lines.lock().unwrap();
852 let expected_log = "Persisting scorer".to_string();
853 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
858 assert!(bg_processor.stop().is_ok());
862 fn test_not_pruning_network_graph_until_graph_sync_completion() {
863 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
864 let data_dir = nodes[0].persister.get_data_dir();
865 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
866 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
867 let network_graph = nodes[0].network_graph.clone();
868 let rapid_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
869 let features = ChannelFeatures::empty();
870 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
871 .expect("Failed to update channel from partial announcement");
872 let original_graph_description = network_graph.to_string();
873 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
874 assert_eq!(network_graph.read_only().channels().len(), 1);
876 let event_handler = |_: &_| {};
877 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), Some(rapid_sync.clone()));
880 let log_entries = nodes[0].logger.lines.lock().unwrap();
881 let expected_log_a = "Assessing prunability of network graph".to_string();
882 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
883 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
884 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
889 let initialization_input = vec![
890 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
891 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
892 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
893 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
894 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
895 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
896 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
897 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
898 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
899 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
900 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
901 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
902 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
904 rapid_sync.update_network_graph(&initialization_input[..]).unwrap();
906 // this should have added two channels
907 assert_eq!(network_graph.read_only().channels().len(), 3);
910 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
911 .expect("Network graph not pruned within deadline");
913 background_processor.stop().unwrap();
915 // all channels should now be pruned
916 assert_eq!(network_graph.read_only().channels().len(), 0);
920 fn test_invoice_payer() {
921 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
922 let random_seed_bytes = keys_manager.get_secure_random_bytes();
923 let nodes = create_nodes(2, "test_invoice_payer".to_string());
925 // Initiate the background processors to watch each node.
926 let data_dir = nodes[0].persister.get_data_dir();
927 let persister = Arc::new(Persister::new(data_dir));
928 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
929 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
930 let event_handler = Arc::clone(&invoice_payer);
931 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
932 assert!(bg_processor.stop().is_ok());