use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
-use lightning::chain::keysinterface::{Sign, KeysInterface};
+use lightning::chain::keysinterface::KeysInterface;
use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use std::ops::Deref;
#[cfg(feature = "futures")]
-use futures::{select, future::FutureExt};
+use futures_util::{select_biased, future::FutureExt};
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
/// [`ChannelManager`] persistence should be done in the background.
/// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
/// at the appropriate intervals.
-/// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
-/// is provided to [`BackgroundProcessor::start`]).
+/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
+/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
///
/// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
/// upon as doing so may result in high latency.
// The network graph must not be pruned while rapid sync completion is pending
log_trace!($logger, "Assessing prunability of network graph");
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
- network_graph.remove_stale_channels();
+ network_graph.remove_stale_channels_and_tracking();
if let Err(e) = $persister.persist_graph(network_graph) {
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
- Signer: 'static + Sign,
CA: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
+ OMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
- M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
- CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
+ M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
+ CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
- PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+ PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
SleepFuture: core::future::Future<Output = bool>,
where
CA::Target: 'static + chain::Access,
CF::Target: 'static + chain::Filter,
- CW::Target: 'static + chain::Watch<Signer>,
+ CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
T::Target: 'static + BroadcasterInterface,
- K::Target: 'static + KeysInterface<Signer = Signer>,
+ K::Target: 'static + KeysInterface,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
- P::Target: 'static + Persist<Signer>,
+ P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
CMH::Target: 'static + ChannelMessageHandler,
+ OMH::Target: 'static + OnionMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
- PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+ PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
{
let mut should_continue = true;
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
gossip_sync, peer_manager, logger, scorer, should_continue, {
- select! {
+ select_biased! {
_ = channel_manager.get_persistable_update_future().fuse() => true,
cont = sleeper(Duration::from_millis(100)).fuse() => {
should_continue = cont;
/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
pub fn start<
'a,
- Signer: 'static + Sign,
CA: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
- M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
- CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
+ M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
+ CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
where
CA::Target: 'static + chain::Access,
CF::Target: 'static + chain::Filter,
- CW::Target: 'static + chain::Watch<Signer>,
+ CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
T::Target: 'static + BroadcasterInterface,
- K::Target: 'static + KeysInterface<Signer = Signer>,
+ K::Target: 'static + KeysInterface,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
- P::Target: 'static + Persist<Signer>,
+ P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
CMH::Target: 'static + ChannelMessageHandler,
OMH::Target: 'static + OnionMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
- PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+ PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
- #[derive(Clone, Eq, Hash, PartialEq)]
+ #[derive(Clone, Hash, PartialEq, Eq)]
struct TestDescriptor{}
impl SocketDescriptor for TestDescriptor {
fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
// Set up a background event handler for FundingGenerationReady events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
- let event_handler = move |event: &Event| {
- sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
+ let event_handler = move |event: &Event| match event {
+ Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
+ Event::ChannelReady { .. } => {},
+ _ => panic!("Unexpected event: {:?}", event),
};
+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
// Open a channel and check that the FundingGenerationReady event was handled.
// Set up a background event handler for SpendableOutputs events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
- let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
+ let event_handler = move |event: &Event| match event {
+ Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
+ Event::ChannelReady { .. } => {},
+ Event::ChannelClosed { .. } => {},
+ _ => panic!("Unexpected event: {:?}", event),
+ };
let persister = Arc::new(Persister::new(data_dir));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
+
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
- .expect("SpendableOutputs not handled within deadline");
+ .expect("Events not handled within deadline");
match event {
Event::SpendableOutputs { .. } => {},
- Event::ChannelClosed { .. } => {},
_ => panic!("Unexpected event: {:?}", event),
}