/// [`ChannelManager`] persistence should be done in the background.
/// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
/// at the appropriate intervals.
+/// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`NetGraphMsgHandler`] is provided to
+/// [`BackgroundProcessor::start`]).
///
/// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
/// upon as doing so may result in high latency.
#[cfg(test)]
const PING_TIMER: u64 = 1;
+/// Prune the network graph of stale entries hourly.
+const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
+
/// Trait which handles persisting a [`ChannelManager`] to disk.
///
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// functionality implemented by other handlers.
/// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures.
///
- /// [top-level documentation]: Self
+ /// [top-level documentation]: BackgroundProcessor
/// [`join`]: Self::join
/// [`stop`]: Self::stop
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
- let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler };
+ let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler: net_graph_msg_handler.as_ref().map(|t| t.deref()) };
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
channel_manager.timer_tick_occurred();
let mut last_freshness_call = Instant::now();
let mut last_ping_call = Instant::now();
+ let mut last_prune_call = Instant::now();
+ let mut have_pruned = false;
+
loop {
peer_manager.process_events();
channel_manager.process_pending_events(&event_handler);
let updates_available =
channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
if updates_available {
+ log_trace!(logger, "Persisting ChannelManager...");
persister.persist_manager(&*channel_manager)?;
+ log_trace!(logger, "Done persisting ChannelManager.");
}
// Exit the loop if the background processor was requested to stop.
if stop_thread.load(Ordering::Acquire) == true {
log_trace!(logger, "Terminating background processor.");
- return Ok(());
+ break;
}
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
peer_manager.timer_tick_occurred();
last_ping_call = Instant::now();
}
+
+ // Note that we want to run a graph prune once not long after startup before
+ // falling back to our usual hourly prunes. This avoids short-lived clients never
+ // pruning their network graph. We run once 60 seconds after startup before
+ // continuing our normal cadence.
+ if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { 60 } {
+ if let Some(ref handler) = net_graph_msg_handler {
+ log_trace!(logger, "Pruning network graph of stale entries");
+ handler.network_graph().remove_stale_channels();
+ last_prune_call = Instant::now();
+ have_pruned = true;
+ }
+ }
}
+ // After we exit, ensure we persist the ChannelManager one final time - this avoids
+ // some races where users quit while channel updates were in-flight, with
+ // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
+ persister.persist_manager(&*channel_manager)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
use lightning::ln::features::InitFeatures;
use lightning::ln::msgs::{ChannelMessageHandler, Init};
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
- use lightning::routing::scorer::Scorer;
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
use lightning::util::config::UserConfig;
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
macro_rules! check_persisted_data {
($node: expr, $filepath: expr, $expected_bytes: expr) => {
- match $node.write(&mut $expected_bytes) {
- Ok(()) => {
- loop {
+ loop {
+ $expected_bytes.clear();
+ match $node.write(&mut $expected_bytes) {
+ Ok(()) => {
match std::fs::read($filepath) {
Ok(bytes) => {
if bytes == $expected_bytes {
},
Err(_) => continue
}
- }
- },
- Err(e) => panic!("Unexpected error: {}", e)
+ },
+ Err(e) => panic!("Unexpected error: {}", e)
+ }
}
}
}
let data_dir = nodes[0].persister.get_data_dir();
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger));
- let scorer = Arc::new(Mutex::new(Scorer::default()));
+ let scorer = Arc::new(Mutex::new(test_utils::TestScorer::default()));
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
let event_handler = Arc::clone(&invoice_payer);
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());