1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 #![deny(broken_intra_doc_links)]
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
28 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 /// writing it to disk/backups by invoking the callback given to it at startup.
40 /// [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 /// at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`NetGraphMsgHandler`] is provided to
44 /// [`BackgroundProcessor::start`]).
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60 stop_thread: Arc<AtomicBool>,
61 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 const FRESHNESS_TIMER: u64 = 60;
67 const FRESHNESS_TIMER: u64 = 1;
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
77 const PING_TIMER: u64 = 1;
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
83 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
85 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
88 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
89 struct DecoratingEventHandler<
91 N: Deref<Target = NetGraphMsgHandler<G, A, L>>,
92 G: Deref<Target = NetworkGraph>,
96 where A::Target: chain::Access, L::Target: Logger {
98 net_graph_msg_handler: Option<N>,
103 N: Deref<Target = NetGraphMsgHandler<G, A, L>>,
104 G: Deref<Target = NetworkGraph>,
107 > EventHandler for DecoratingEventHandler<E, N, G, A, L>
108 where A::Target: chain::Access, L::Target: Logger {
109 fn handle_event(&self, event: &Event) {
110 if let Some(event_handler) = &self.net_graph_msg_handler {
111 event_handler.handle_event(event);
113 self.event_handler.handle_event(event);
117 impl BackgroundProcessor {
118 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
121 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
122 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
123 /// either [`join`] or [`stop`].
125 /// # Data Persistence
127 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
128 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
129 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
130 /// provided implementation.
132 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
133 /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate
134 /// for LDK's provided implementation.
136 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
137 /// error or call [`join`] and handle any error that may arise. For the latter case,
138 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
142 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
143 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
144 /// functionality implemented by other handlers.
145 /// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures.
147 /// # Rapid Gossip Sync
149 /// If rapid gossip sync is meant to run at startup, pass an optional [`RapidGossipSync`]
150 /// to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`] not to prune the
151 /// [`NetworkGraph`] instance until the [`RapidGossipSync`] instance completes its first sync.
153 /// [top-level documentation]: BackgroundProcessor
154 /// [`join`]: Self::join
155 /// [`stop`]: Self::stop
156 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
157 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
158 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
159 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
160 /// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
161 /// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
164 Signer: 'static + Sign,
165 CA: 'static + Deref + Send + Sync,
166 CF: 'static + Deref + Send + Sync,
167 CW: 'static + Deref + Send + Sync,
168 T: 'static + Deref + Send + Sync,
169 K: 'static + Deref + Send + Sync,
170 F: 'static + Deref + Send + Sync,
171 G: 'static + Deref<Target = NetworkGraph> + Send + Sync,
172 L: 'static + Deref + Send + Sync,
173 P: 'static + Deref + Send + Sync,
174 Descriptor: 'static + SocketDescriptor + Send + Sync,
175 CMH: 'static + Deref + Send + Sync,
176 RMH: 'static + Deref + Send + Sync,
177 EH: 'static + EventHandler + Send,
178 PS: 'static + Deref + Send,
179 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
180 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
181 NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
182 UMH: 'static + Deref + Send + Sync,
183 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
184 S: 'static + Deref<Target = SC> + Send + Sync,
185 SC: WriteableScore<'a>,
186 RGS: 'static + Deref<Target = RapidGossipSync<G>> + Send
188 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
189 net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>,
190 rapid_gossip_sync: Option<RGS>
193 CA::Target: 'static + chain::Access,
194 CF::Target: 'static + chain::Filter,
195 CW::Target: 'static + chain::Watch<Signer>,
196 T::Target: 'static + BroadcasterInterface,
197 K::Target: 'static + KeysInterface<Signer = Signer>,
198 F::Target: 'static + FeeEstimator,
199 L::Target: 'static + Logger,
200 P::Target: 'static + Persist<Signer>,
201 CMH::Target: 'static + ChannelMessageHandler,
202 RMH::Target: 'static + RoutingMessageHandler,
203 UMH::Target: 'static + CustomMessageHandler,
204 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
206 let stop_thread = Arc::new(AtomicBool::new(false));
207 let stop_thread_clone = stop_thread.clone();
208 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
209 let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler: net_graph_msg_handler.as_ref().map(|t| t.deref()) };
211 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
212 channel_manager.timer_tick_occurred();
214 let mut last_freshness_call = Instant::now();
215 let mut last_ping_call = Instant::now();
216 let mut last_prune_call = Instant::now();
217 let mut have_pruned = false;
220 channel_manager.process_pending_events(&event_handler);
221 chain_monitor.process_pending_events(&event_handler);
223 // Note that the PeerManager::process_events may block on ChannelManager's locks,
224 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
225 // we want to ensure we get into `persist_manager` as quickly as we can, especially
226 // without running the normal event processing above and handing events to users.
228 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
229 // processing a message effectively at any point during this loop. In order to
230 // minimize the time between such processing completing and persisting the updated
231 // ChannelManager, we want to minimize methods blocking on a ChannelManager
232 // generally, and as a fallback place such blocking only immediately before
234 peer_manager.process_events();
236 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
237 // see `await_start`'s use below.
238 let await_start = Instant::now();
239 let updates_available =
240 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
241 let await_time = await_start.elapsed();
243 if updates_available {
244 log_trace!(logger, "Persisting ChannelManager...");
245 persister.persist_manager(&*channel_manager)?;
246 log_trace!(logger, "Done persisting ChannelManager.");
248 // Exit the loop if the background processor was requested to stop.
249 if stop_thread.load(Ordering::Acquire) == true {
250 log_trace!(logger, "Terminating background processor.");
253 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
254 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
255 channel_manager.timer_tick_occurred();
256 last_freshness_call = Instant::now();
258 if await_time > Duration::from_secs(1) {
259 // On various platforms, we may be starved of CPU cycles for several reasons.
260 // E.g. on iOS, if we've been in the background, we will be entirely paused.
261 // Similarly, if we're on a desktop platform and the device has been asleep, we
262 // may not get any cycles.
263 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
264 // full second, at which point we assume sockets may have been killed (they
265 // appear to be at least on some platforms, even if it has only been a second).
266 // Note that we have to take care to not get here just because user event
267 // processing was slow at the top of the loop. For example, the sample client
268 // may call Bitcoin Core RPCs during event handling, which very often takes
269 // more than a handful of seconds to complete, and shouldn't disconnect all our
271 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
272 peer_manager.disconnect_all_peers();
273 last_ping_call = Instant::now();
274 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
275 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
276 peer_manager.timer_tick_occurred();
277 last_ping_call = Instant::now();
280 // Note that we want to run a graph prune once not long after startup before
281 // falling back to our usual hourly prunes. This avoids short-lived clients never
282 // pruning their network graph. We run once 60 seconds after startup before
283 // continuing our normal cadence.
284 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
285 // The network graph must not be pruned while rapid sync completion is pending
286 log_trace!(logger, "Assessing prunability of network graph");
287 let graph_to_prune = match rapid_gossip_sync.as_ref() {
288 Some(rapid_sync) => {
289 if rapid_sync.is_initial_sync_complete() {
290 Some(rapid_sync.network_graph())
295 None => net_graph_msg_handler.as_ref().map(|handler| handler.network_graph())
298 if let Some(network_graph_reference) = graph_to_prune {
299 network_graph_reference.remove_stale_channels();
301 if let Err(e) = persister.persist_graph(network_graph_reference) {
302 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
305 last_prune_call = Instant::now();
308 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
310 if let Some(ref scorer) = scorer {
311 log_trace!(logger, "Persisting scorer");
312 if let Err(e) = persister.persist_scorer(&scorer) {
313 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
319 // After we exit, ensure we persist the ChannelManager one final time - this avoids
320 // some races where users quit while channel updates were in-flight, with
321 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
322 persister.persist_manager(&*channel_manager)?;
324 // Persist Scorer on exit
325 if let Some(ref scorer) = scorer {
326 persister.persist_scorer(&scorer)?;
329 // Persist NetworkGraph on exit
330 if let Some(ref handler) = net_graph_msg_handler {
331 persister.persist_graph(handler.network_graph())?;
336 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
339 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
340 /// [`ChannelManager`].
344 /// This function panics if the background thread has panicked such as while persisting or
347 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
348 pub fn join(mut self) -> Result<(), std::io::Error> {
349 assert!(self.thread_handle.is_some());
353 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
354 /// [`ChannelManager`].
358 /// This function panics if the background thread has panicked such as while persisting or
361 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
362 pub fn stop(mut self) -> Result<(), std::io::Error> {
363 assert!(self.thread_handle.is_some());
364 self.stop_and_join_thread()
367 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
368 self.stop_thread.store(true, Ordering::Release);
372 fn join_thread(&mut self) -> Result<(), std::io::Error> {
373 match self.thread_handle.take() {
374 Some(handle) => handle.join().unwrap(),
380 impl Drop for BackgroundProcessor {
382 self.stop_and_join_thread().unwrap();
388 use bitcoin::blockdata::block::BlockHeader;
389 use bitcoin::blockdata::constants::genesis_block;
390 use bitcoin::blockdata::transaction::{Transaction, TxOut};
391 use bitcoin::network::constants::Network;
392 use lightning::chain::{BestBlock, Confirm, chainmonitor};
393 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
394 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
395 use lightning::chain::transaction::OutPoint;
396 use lightning::get_event_msg;
397 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
398 use lightning::ln::features::{ChannelFeatures, InitFeatures};
399 use lightning::ln::msgs::{ChannelMessageHandler, Init};
400 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
401 use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
402 use lightning::util::config::UserConfig;
403 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
404 use lightning::util::ser::Writeable;
405 use lightning::util::test_utils;
406 use lightning::util::persist::KVStorePersister;
407 use lightning_invoice::payment::{InvoicePayer, Retry};
408 use lightning_invoice::utils::DefaultRouter;
409 use lightning_persister::FilesystemPersister;
411 use std::path::PathBuf;
412 use std::sync::{Arc, Mutex};
413 use std::sync::mpsc::SyncSender;
414 use std::time::Duration;
415 use lightning::routing::scoring::{FixedPenaltyScorer};
416 use lightning_rapid_gossip_sync::RapidGossipSync;
417 use super::{BackgroundProcessor, FRESHNESS_TIMER};
419 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
421 #[derive(Clone, Eq, Hash, PartialEq)]
422 struct TestDescriptor{}
423 impl SocketDescriptor for TestDescriptor {
424 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
428 fn disconnect_socket(&mut self) {}
431 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
434 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
435 net_graph_msg_handler: Option<Arc<NetGraphMsgHandler<Arc<NetworkGraph>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>>,
436 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
437 chain_monitor: Arc<ChainMonitor>,
438 persister: Arc<FilesystemPersister>,
439 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
440 network_graph: Arc<NetworkGraph>,
441 logger: Arc<test_utils::TestLogger>,
442 best_block: BestBlock,
443 scorer: Arc<Mutex<FixedPenaltyScorer>>,
444 rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>>
449 let data_dir = self.persister.get_data_dir();
450 match fs::remove_dir_all(data_dir.clone()) {
451 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
458 graph_error: Option<(std::io::ErrorKind, &'static str)>,
459 graph_persistence_notifier: Option<SyncSender<()>>,
460 manager_error: Option<(std::io::ErrorKind, &'static str)>,
461 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
462 filesystem_persister: FilesystemPersister,
466 fn new(data_dir: String) -> Self {
467 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
468 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
471 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
472 Self { graph_error: Some((error, message)), ..self }
475 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
476 Self { graph_persistence_notifier: Some(sender), ..self }
479 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
480 Self { manager_error: Some((error, message)), ..self }
483 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
484 Self { scorer_error: Some((error, message)), ..self }
488 impl KVStorePersister for Persister {
489 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
490 if key == "manager" {
491 if let Some((error, message)) = self.manager_error {
492 return Err(std::io::Error::new(error, message))
496 if key == "network_graph" {
497 if let Some(sender) = &self.graph_persistence_notifier {
498 sender.send(()).unwrap();
501 if let Some((error, message)) = self.graph_error {
502 return Err(std::io::Error::new(error, message))
507 if let Some((error, message)) = self.scorer_error {
508 return Err(std::io::Error::new(error, message))
512 self.filesystem_persister.persist(key, object)
516 fn get_full_filepath(filepath: String, filename: String) -> String {
517 let mut path = PathBuf::from(filepath);
519 path.to_str().unwrap().to_string()
522 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
523 let mut nodes = Vec::new();
524 for i in 0..num_nodes {
525 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
526 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
527 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
528 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
529 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
530 let seed = [i as u8; 32];
531 let network = Network::Testnet;
532 let genesis_block = genesis_block(network);
533 let now = Duration::from_secs(genesis_block.header.time as u64);
534 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
535 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
536 let best_block = BestBlock::from_genesis(network);
537 let params = ChainParameters { network, best_block };
538 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
539 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash()));
540 let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
541 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
542 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
543 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
544 let rapid_gossip_sync = None;
545 let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer, rapid_gossip_sync };
549 for i in 0..num_nodes {
550 for j in (i+1)..num_nodes {
551 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
552 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
559 macro_rules! open_channel {
560 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
561 begin_open_channel!($node_a, $node_b, $channel_value);
562 let events = $node_a.node.get_and_clear_pending_events();
563 assert_eq!(events.len(), 1);
564 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
565 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
570 macro_rules! begin_open_channel {
571 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
572 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
573 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
574 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
578 macro_rules! handle_funding_generation_ready {
579 ($event: expr, $channel_value: expr) => {{
581 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
582 assert_eq!(channel_value_satoshis, $channel_value);
583 assert_eq!(user_channel_id, 42);
585 let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
586 value: channel_value_satoshis, script_pubkey: output_script.clone(),
588 (temporary_channel_id, tx)
590 _ => panic!("Unexpected event"),
595 macro_rules! end_open_channel {
596 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
597 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
598 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
599 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
603 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
605 let prev_blockhash = node.best_block.block_hash();
606 let height = node.best_block.height() + 1;
607 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
608 let txdata = vec![(0, tx)];
609 node.best_block = BestBlock::new(header.block_hash(), height);
612 node.node.transactions_confirmed(&header, &txdata, height);
613 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
616 node.node.best_block_updated(&header, height);
617 node.chain_monitor.best_block_updated(&header, height);
623 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
624 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
628 fn test_background_processor() {
629 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
630 // updates. Also test that when new updates are available, the manager signals that it needs
631 // re-persistence and is successfully re-persisted.
632 let nodes = create_nodes(2, "test_background_processor".to_string());
634 // Go through the channel creation process so that each node has something to persist. Since
635 // open_channel consumes events, it must complete before starting BackgroundProcessor to
636 // avoid a race with processing events.
637 let tx = open_channel!(nodes[0], nodes[1], 100000);
639 // Initiate the background processors to watch each node.
640 let data_dir = nodes[0].persister.get_data_dir();
641 let persister = Arc::new(Persister::new(data_dir));
642 let event_handler = |_: &_| {};
643 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
645 macro_rules! check_persisted_data {
646 ($node: expr, $filepath: expr) => {
647 let mut expected_bytes = Vec::new();
649 expected_bytes.clear();
650 match $node.write(&mut expected_bytes) {
652 match std::fs::read($filepath) {
654 if bytes == expected_bytes {
663 Err(e) => panic!("Unexpected error: {}", e)
669 // Check that the initial channel manager data is persisted as expected.
670 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
671 check_persisted_data!(nodes[0].node, filepath.clone());
674 if !nodes[0].node.get_persistence_condvar_value() { break }
677 // Force-close the channel.
678 nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
680 // Check that the force-close updates are persisted.
681 check_persisted_data!(nodes[0].node, filepath.clone());
683 if !nodes[0].node.get_persistence_condvar_value() { break }
686 // Check network graph is persisted
687 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
688 if let Some(ref handler) = nodes[0].net_graph_msg_handler {
689 let network_graph = handler.network_graph();
690 check_persisted_data!(network_graph, filepath.clone());
693 // Check scorer is persisted
694 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
695 check_persisted_data!(nodes[0].scorer, filepath.clone());
697 assert!(bg_processor.stop().is_ok());
701 fn test_timer_tick_called() {
702 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
703 // `FRESHNESS_TIMER`.
704 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
705 let data_dir = nodes[0].persister.get_data_dir();
706 let persister = Arc::new(Persister::new(data_dir));
707 let event_handler = |_: &_| {};
708 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
710 let log_entries = nodes[0].logger.lines.lock().unwrap();
711 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
712 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
713 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
714 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
719 assert!(bg_processor.stop().is_ok());
723 fn test_channel_manager_persist_error() {
724 // Test that if we encounter an error during manager persistence, the thread panics.
725 let nodes = create_nodes(2, "test_persist_error".to_string());
726 open_channel!(nodes[0], nodes[1], 100000);
728 let data_dir = nodes[0].persister.get_data_dir();
729 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
730 let event_handler = |_: &_| {};
731 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
732 match bg_processor.join() {
733 Ok(_) => panic!("Expected error persisting manager"),
735 assert_eq!(e.kind(), std::io::ErrorKind::Other);
736 assert_eq!(e.get_ref().unwrap().to_string(), "test");
742 fn test_network_graph_persist_error() {
743 // Test that if we encounter an error during network graph persistence, an error gets returned.
744 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
745 let data_dir = nodes[0].persister.get_data_dir();
746 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
747 let event_handler = |_: &_| {};
748 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
750 match bg_processor.stop() {
751 Ok(_) => panic!("Expected error persisting network graph"),
753 assert_eq!(e.kind(), std::io::ErrorKind::Other);
754 assert_eq!(e.get_ref().unwrap().to_string(), "test");
760 fn test_scorer_persist_error() {
761 // Test that if we encounter an error during scorer persistence, an error gets returned.
762 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
763 let data_dir = nodes[0].persister.get_data_dir();
764 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
765 let event_handler = |_: &_| {};
766 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
768 match bg_processor.stop() {
769 Ok(_) => panic!("Expected error persisting scorer"),
771 assert_eq!(e.kind(), std::io::ErrorKind::Other);
772 assert_eq!(e.get_ref().unwrap().to_string(), "test");
778 fn test_background_event_handling() {
779 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
780 let channel_value = 100000;
781 let data_dir = nodes[0].persister.get_data_dir();
782 let persister = Arc::new(Persister::new(data_dir.clone()));
784 // Set up a background event handler for FundingGenerationReady events.
785 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
786 let event_handler = move |event: &Event| {
787 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
789 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
791 // Open a channel and check that the FundingGenerationReady event was handled.
792 begin_open_channel!(nodes[0], nodes[1], channel_value);
793 let (temporary_channel_id, funding_tx) = receiver
794 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
795 .expect("FundingGenerationReady not handled within deadline");
796 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
798 // Confirm the funding transaction.
799 confirm_transaction(&mut nodes[0], &funding_tx);
800 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
801 confirm_transaction(&mut nodes[1], &funding_tx);
802 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
803 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
804 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
805 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
806 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
808 assert!(bg_processor.stop().is_ok());
810 // Set up a background event handler for SpendableOutputs events.
811 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
812 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
813 let persister = Arc::new(Persister::new(data_dir));
814 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
816 // Force close the channel and check that the SpendableOutputs event was handled.
817 nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
818 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
819 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
821 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
822 .expect("SpendableOutputs not handled within deadline");
824 Event::SpendableOutputs { .. } => {},
825 Event::ChannelClosed { .. } => {},
826 _ => panic!("Unexpected event: {:?}", event),
829 assert!(bg_processor.stop().is_ok());
833 fn test_scorer_persistence() {
834 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
835 let data_dir = nodes[0].persister.get_data_dir();
836 let persister = Arc::new(Persister::new(data_dir));
837 let event_handler = |_: &_| {};
838 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
841 let log_entries = nodes[0].logger.lines.lock().unwrap();
842 let expected_log = "Persisting scorer".to_string();
843 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
848 assert!(bg_processor.stop().is_ok());
852 fn test_not_pruning_network_graph_until_graph_sync_completion() {
853 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
854 let data_dir = nodes[0].persister.get_data_dir();
855 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
856 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
857 let network_graph = nodes[0].network_graph.clone();
858 let rapid_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
859 let features = ChannelFeatures::empty();
860 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
861 .expect("Failed to update channel from partial announcement");
862 let original_graph_description = network_graph.to_string();
863 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
864 assert_eq!(network_graph.read_only().channels().len(), 1);
866 let event_handler = |_: &_| {};
867 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), Some(rapid_sync.clone()));
870 let log_entries = nodes[0].logger.lines.lock().unwrap();
871 let expected_log_a = "Assessing prunability of network graph".to_string();
872 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
873 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
874 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
879 let initialization_input = vec![
880 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
881 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
882 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
883 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
884 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
885 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
886 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
887 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
888 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
889 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
890 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
891 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
892 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
894 rapid_sync.update_network_graph(&initialization_input[..]).unwrap();
896 // this should have added two channels
897 assert_eq!(network_graph.read_only().channels().len(), 3);
900 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
901 .expect("Network graph not pruned within deadline");
903 background_processor.stop().unwrap();
905 // all channels should now be pruned
906 assert_eq!(network_graph.read_only().channels().len(), 0);
910 fn test_invoice_payer() {
911 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
912 let random_seed_bytes = keys_manager.get_secure_random_bytes();
913 let nodes = create_nodes(2, "test_invoice_payer".to_string());
915 // Initiate the background processors to watch each node.
916 let data_dir = nodes[0].persister.get_data_dir();
917 let persister = Arc::new(Persister::new(data_dir));
918 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
919 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
920 let event_handler = Arc::clone(&invoice_payer);
921 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
922 assert!(bg_processor.stop().is_ok());