use std::ops::Deref;
#[cfg(feature = "futures")]
-use futures::{select, future::FutureExt};
+use futures_util::{select_biased, future::FutureExt};
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
/// [`ChannelManager`] persistence should be done in the background.
/// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
/// at the appropriate intervals.
-/// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
-/// is provided to [`BackgroundProcessor::start`]).
+/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
+/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
///
/// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
/// upon as doing so may result in high latency.
// The network graph must not be pruned while rapid sync completion is pending
log_trace!($logger, "Assessing prunability of network graph");
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
- network_graph.remove_stale_channels();
+ network_graph.remove_stale_channels_and_tracking();
if let Err(e) = $persister.persist_graph(network_graph) {
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
/// Processes background events in a future.
///
/// `sleeper` should return a future which completes in the given amount of time and returns a
-/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
-/// future which outputs false, the loop will exit and this function's future will complete.
+/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
+/// future which outputs true, the loop will exit and this function's future will complete.
///
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
#[cfg(feature = "futures")]
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
+ OMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
- PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+ PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
SleepFuture: core::future::Future<Output = bool>,
L::Target: 'static + Logger,
P::Target: 'static + Persist<Signer>,
CMH::Target: 'static + ChannelMessageHandler,
+ OMH::Target: 'static + OnionMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
{
- let mut should_continue = true;
+ let mut should_break = true;
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
- gossip_sync, peer_manager, logger, scorer, should_continue, {
- select! {
+ gossip_sync, peer_manager, logger, scorer, should_break, {
+ select_biased! {
_ = channel_manager.get_persistable_update_future().fuse() => true,
- cont = sleeper(Duration::from_millis(100)).fuse() => {
- should_continue = cont;
+ exit = sleeper(Duration::from_millis(100)).fuse() => {
+ should_break = exit;
false
}
}
use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
use lightning::chain::transaction::OutPoint;
use lightning::get_event_msg;
- use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
- use lightning::ln::features::{ChannelFeatures, InitFeatures};
+ use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
+ use lightning::ln::features::ChannelFeatures;
use lightning::ln::msgs::{ChannelMessageHandler, Init};
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
- #[derive(Clone, Eq, Hash, PartialEq)]
+ #[derive(Clone, Hash, PartialEq, Eq)]
struct TestDescriptor{}
impl SocketDescriptor for TestDescriptor {
fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
for i in 0..num_nodes {
for j in (i+1)..num_nodes {
- nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
- nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
+ nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
+ nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
}
}
macro_rules! begin_open_channel {
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
- $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
- $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
+ $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
+ $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
}}
}
// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
- let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
- let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
+ let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
+ let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
let event_handler = Arc::clone(&invoice_payer);
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
assert!(bg_processor.stop().is_ok());