//! running properly, and (2) either can or should be run in the background. See docs for
//! [`BackgroundProcessor`] for more details on the nitty-gritty.
-// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
-#![deny(broken_intra_doc_links)]
-#![deny(private_intra_doc_links)]
+#![deny(rustdoc::broken_intra_doc_links)]
+#![deny(rustdoc::private_intra_doc_links)]
#![deny(missing_docs)]
#![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
-use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
use lightning::events::{Event, PathFailure};
#[cfg(feature = "std")]
-use lightning::events::{EventHandler, EventsProvider};
-use lightning::ln::channelmanager::ChannelManager;
+use lightning::events::EventHandler;
+#[cfg(any(feature = "std", feature = "futures"))]
+use lightning::events::EventsProvider;
+
+use lightning::ln::channelmanager::AChannelManager;
+use lightning::ln::msgs::OnionMessageHandler;
use lightning::ln::peer_handler::APeerManager;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::UtxoLookup;
-use lightning::routing::router::Router;
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
/// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
/// unilateral chain closure fees are at risk.
///
+/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
+/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
/// [`Event`]: lightning::events::Event
/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
#[cfg(test)]
const PING_TIMER: u64 = 1;
+#[cfg(not(test))]
+const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
+#[cfg(test)]
+const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
+
/// Prune the network graph of stale entries hourly.
const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
#[cfg(not(test))]
-const SCORER_PERSIST_TIMER: u64 = 60 * 60;
+const SCORER_PERSIST_TIMER: u64 = 60 * 5;
#[cfg(test)]
const SCORER_PERSIST_TIMER: u64 = 1;
/// Updates scorer based on event and returns whether an update occurred so we can decide whether
/// to persist.
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
- scorer: &'a S, event: &Event
+ scorer: &'a S, event: &Event, duration_since_epoch: Duration,
) -> bool {
match event {
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
let mut score = scorer.write_lock();
- score.payment_path_failed(path, *scid);
+ score.payment_path_failed(path, *scid, duration_since_epoch);
},
Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
// Reached if the destination explicitly failed it back. We treat this as a successful probe
// because the payment made it all the way to the destination with sufficient liquidity.
let mut score = scorer.write_lock();
- score.probe_successful(path);
+ score.probe_successful(path, duration_since_epoch);
},
Event::PaymentPathSuccessful { path, .. } => {
let mut score = scorer.write_lock();
- score.payment_path_successful(path);
+ score.payment_path_successful(path, duration_since_epoch);
},
Event::ProbeSuccessful { path, .. } => {
let mut score = scorer.write_lock();
- score.probe_successful(path);
+ score.probe_successful(path, duration_since_epoch);
},
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
let mut score = scorer.write_lock();
- score.probe_failed(path, *scid);
+ score.probe_failed(path, *scid, duration_since_epoch);
},
_ => return false,
}
}
macro_rules! define_run_body {
- ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
- $channel_manager: ident, $process_channel_manager_events: expr,
- $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
- $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
- $check_slow_await: expr)
- => { {
+ (
+ $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
+ $channel_manager: ident, $process_channel_manager_events: expr,
+ $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
+ $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
+ ) => { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
- $channel_manager.timer_tick_occurred();
+ $channel_manager.get_cm().timer_tick_occurred();
log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
$chain_monitor.rebroadcast_pending_claims();
let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
+ let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
let mut last_ping_call = $get_timer(PING_TIMER);
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
let mut have_pruned = false;
+ let mut have_decayed_scorer = false;
loop {
$process_channel_manager_events;
$process_chain_monitor_events;
+ $process_onion_message_handler_events;
// Note that the PeerManager::process_events may block on ChannelManager's locks,
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
break;
}
- if $channel_manager.get_and_clear_needs_persistence() {
+ if $channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!($logger, "Persisting ChannelManager...");
- $persister.persist_manager(&*$channel_manager)?;
+ $persister.persist_manager(&$channel_manager)?;
log_trace!($logger, "Done persisting ChannelManager.");
}
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
- $channel_manager.timer_tick_occurred();
+ $channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = $get_timer(FRESHNESS_TIMER);
}
+ if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
+ log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
+ $peer_manager.onion_message_handler().timer_tick_occurred();
+ last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
+ }
if await_slow {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
if should_prune {
// The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
- #[cfg(feature = "std")] {
+ if let Some(duration_since_epoch) = $time_fetch() {
log_trace!($logger, "Pruning and persisting network graph.");
- network_graph.remove_stale_channels_and_tracking();
- }
- #[cfg(not(feature = "std"))] {
+ network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
+ } else {
log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
log_trace!($logger, "Persisting network graph.");
}
last_prune_call = $get_timer(prune_timer);
}
+ if !have_decayed_scorer {
+ if let Some(ref scorer) = $scorer {
+ if let Some(duration_since_epoch) = $time_fetch() {
+ log_trace!($logger, "Calling time_passed on scorer at startup");
+ scorer.write_lock().time_passed(duration_since_epoch);
+ }
+ }
+ have_decayed_scorer = true;
+ }
+
if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
if let Some(ref scorer) = $scorer {
- log_trace!($logger, "Persisting scorer");
+ if let Some(duration_since_epoch) = $time_fetch() {
+ log_trace!($logger, "Calling time_passed and persisting scorer");
+ scorer.write_lock().time_passed(duration_since_epoch);
+ } else {
+ log_trace!($logger, "Persisting scorer");
+ }
if let Err(e) = $persister.persist_scorer(&scorer) {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
- $persister.persist_manager(&*$channel_manager)?;
+ $persister.persist_manager(&$channel_manager)?;
// Persist Scorer on exit
if let Some(ref scorer) = $scorer {
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
/// are hundreds or thousands of simultaneous process calls running.
///
+/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
+/// no time is available, some features may be disabled, however the node will still operate fine.
+///
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
/// could setup `process_events_async` like this:
/// ```
/// # use lightning::io;
/// # use std::sync::{Arc, RwLock};
/// # use std::sync::atomic::{AtomicBool, Ordering};
+/// # use std::time::SystemTime;
/// # use lightning_background_processor::{process_events_async, GossipSync};
-/// # struct MyStore {}
-/// # impl lightning::util::persist::KVStore for MyStore {
+/// # struct Logger {}
+/// # impl lightning::util::logger::Logger for Logger {
+/// # fn log(&self, _record: lightning::util::logger::Record) {}
+/// # }
+/// # struct Store {}
+/// # impl lightning::util::persist::KVStore for Store {
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
/// # }
-/// # struct MyEventHandler {}
-/// # impl MyEventHandler {
+/// # struct EventHandler {}
+/// # impl EventHandler {
/// # async fn handle_event(&self, _: lightning::events::Event) {}
/// # }
/// # #[derive(Eq, PartialEq, Clone, Hash)]
-/// # struct MySocketDescriptor {}
-/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
+/// # struct SocketDescriptor {}
+/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
/// # fn disconnect_socket(&mut self) {}
/// # }
-/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
-/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
-/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
-/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
-/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
-/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
-/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
-/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
-/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
-/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
-/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
-/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
-///
-/// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
-/// let background_persister = Arc::clone(&my_persister);
-/// let background_event_handler = Arc::clone(&my_event_handler);
-/// let background_chain_mon = Arc::clone(&my_chain_monitor);
-/// let background_chan_man = Arc::clone(&my_channel_manager);
-/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
-/// let background_peer_man = Arc::clone(&my_peer_manager);
-/// let background_logger = Arc::clone(&my_logger);
-/// let background_scorer = Arc::clone(&my_scorer);
+/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>>;
+/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
+/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
+/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
+/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
+/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
+/// #
+/// # struct Node<
+/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
+/// # F: lightning::chain::Filter + Send + Sync + 'static,
+/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
+/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
+/// # > {
+/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
+/// # event_handler: Arc<EventHandler>,
+/// # channel_manager: Arc<ChannelManager<B, F, FE>>,
+/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
+/// # gossip_sync: Arc<P2PGossipSync<UL>>,
+/// # persister: Arc<Store>,
+/// # logger: Arc<Logger>,
+/// # scorer: Arc<Scorer>,
+/// # }
+/// #
+/// # async fn setup_background_processing<
+/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
+/// # F: lightning::chain::Filter + Send + Sync + 'static,
+/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
+/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
+/// # >(node: Node<B, F, FE, UL>) {
+/// let background_persister = Arc::clone(&node.persister);
+/// let background_event_handler = Arc::clone(&node.event_handler);
+/// let background_chain_mon = Arc::clone(&node.chain_monitor);
+/// let background_chan_man = Arc::clone(&node.channel_manager);
+/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
+/// let background_peer_man = Arc::clone(&node.peer_manager);
+/// let background_logger = Arc::clone(&node.logger);
+/// let background_scorer = Arc::clone(&node.scorer);
///
/// // Setup the sleeper.
/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
/// Some(background_scorer),
/// sleeper,
/// mobile_interruptable_platform,
-/// )
-/// .await
-/// .expect("Failed to process events");
+/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
+/// )
+/// .await
+/// .expect("Failed to process events");
/// });
///
/// // Stop the background processing.
'a,
UL: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
- CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
- ES: 'static + Deref + Send + Sync,
- NS: 'static + Deref + Send + Sync,
- SP: 'static + Deref + Send + Sync,
F: 'static + Deref + Send + Sync,
- R: 'static + Deref + Send + Sync,
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
EventHandlerFuture: core::future::Future<Output = ()>,
EventHandler: Fn(Event) -> EventHandlerFuture,
PS: 'static + Deref + Send,
- M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
- CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
+ M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+ CM: 'static + Deref + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
- APM: APeerManager + Send + Sync,
- PM: 'static + Deref<Target = APM> + Send + Sync,
+ PM: 'static + Deref + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
- Sleeper: Fn(Duration) -> SleepFuture
+ Sleeper: Fn(Duration) -> SleepFuture,
+ FetchTime: Fn() -> Option<Duration>,
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
- sleeper: Sleeper, mobile_interruptable_platform: bool,
+ sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
- CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
T::Target: 'static + BroadcasterInterface,
- ES::Target: 'static + EntropySource,
- NS::Target: 'static + NodeSigner,
- SP::Target: 'static + SignerProvider,
F::Target: 'static + FeeEstimator,
- R::Target: 'static + Router,
L::Target: 'static + Logger,
- P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
- PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
+ P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
+ PS::Target: 'static + Persister<'a, CM, L, SC>,
+ CM::Target: AChannelManager + Send + Sync,
+ PM::Target: APeerManager + Send + Sync,
{
let mut should_break = false;
let async_event_handler = |event| {
let scorer = &scorer;
let logger = &logger;
let persister = &persister;
+ let fetch_time = &fetch_time;
async move {
if let Some(network_graph) = network_graph {
handle_network_graph_update(network_graph, &event)
}
if let Some(ref scorer) = scorer {
- if update_scorer(scorer, &event) {
- log_trace!(logger, "Persisting scorer after update");
- if let Err(e) = persister.persist_scorer(&scorer) {
- log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+ if let Some(duration_since_epoch) = fetch_time() {
+ if update_scorer(scorer, &event, duration_since_epoch) {
+ log_trace!(logger, "Persisting scorer after update");
+ if let Err(e) = persister.persist_scorer(&scorer) {
+ log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+ }
}
}
}
event_handler(event).await;
}
};
- define_run_body!(persister,
- chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
- channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
- gossip_sync, peer_manager, logger, scorer, should_break, {
+ define_run_body!(
+ persister, chain_monitor,
+ chain_monitor.process_pending_events_async(async_event_handler).await,
+ channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
+ peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
+ gossip_sync, logger, scorer, should_break, {
let fut = Selector {
- a: channel_manager.get_event_or_persistence_needed_future(),
+ a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
b: chain_monitor.get_update_future(),
c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
};
task::Poll::Ready(exit) => { should_break = exit; true },
task::Poll::Pending => false,
}
- }, mobile_interruptable_platform)
+ }, mobile_interruptable_platform, fetch_time,
+ )
+}
+
+#[cfg(feature = "futures")]
+async fn process_onion_message_handler_events_async<
+ EventHandlerFuture: core::future::Future<Output = ()>,
+ EventHandler: Fn(Event) -> EventHandlerFuture,
+ PM: 'static + Deref + Send + Sync,
+>(
+ peer_manager: &PM, handler: EventHandler
+)
+where
+ PM::Target: APeerManager + Send + Sync,
+{
+ let events = core::cell::RefCell::new(Vec::new());
+ peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
+
+ for event in events.into_inner() {
+ handler(event).await
+ }
}
#[cfg(feature = "std")]
'a,
UL: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
- CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
- ES: 'static + Deref + Send + Sync,
- NS: 'static + Deref + Send + Sync,
- SP: 'static + Deref + Send + Sync,
F: 'static + Deref + Send + Sync,
- R: 'static + Deref + Send + Sync,
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
- M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
- CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
+ M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+ CM: 'static + Deref + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
- APM: APeerManager + Send + Sync,
- PM: 'static + Deref<Target = APM> + Send + Sync,
+ PM: 'static + Deref + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for <'b> WriteableScore<'b>,
>(
where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
- CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
T::Target: 'static + BroadcasterInterface,
- ES::Target: 'static + EntropySource,
- NS::Target: 'static + NodeSigner,
- SP::Target: 'static + SignerProvider,
F::Target: 'static + FeeEstimator,
- R::Target: 'static + Router,
L::Target: 'static + Logger,
- P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
- PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
+ P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
+ PS::Target: 'static + Persister<'a, CM, L, SC>,
+ CM::Target: AChannelManager + Send + Sync,
+ PM::Target: APeerManager + Send + Sync,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
handle_network_graph_update(network_graph, &event)
}
if let Some(ref scorer) = scorer {
- if update_scorer(scorer, &event) {
+ use std::time::SystemTime;
+ let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Time should be sometime after 1970");
+ if update_scorer(scorer, &event, duration_since_epoch) {
log_trace!(logger, "Persisting scorer after update");
if let Err(e) = persister.persist_scorer(&scorer) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
event_handler.handle_event(event);
};
- define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
- channel_manager, channel_manager.process_pending_events(&event_handler),
- gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
+ define_run_body!(
+ persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
+ channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
+ peer_manager,
+ peer_manager.onion_message_handler().process_pending_events(&event_handler),
+ gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
{ Sleeper::from_two_futures(
- channel_manager.get_event_or_persistence_needed_future(),
- chain_monitor.get_update_future()
+ &channel_manager.get_cm().get_event_or_persistence_needed_future(),
+ &chain_monitor.get_update_future()
).wait_timeout(Duration::from_millis(100)); },
- |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
+ |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
+ || {
+ use std::time::SystemTime;
+ Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Time should be sometime after 1970"))
+ },
+ )
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
#[cfg(all(feature = "std", test))]
mod tests {
+ use bitcoin::{ScriptBuf, Txid};
use bitcoin::blockdata::constants::{genesis_block, ChainHash};
use bitcoin::blockdata::locktime::absolute::LockTime;
use bitcoin::blockdata::transaction::{Transaction, TxOut};
+ use bitcoin::hashes::Hash;
use bitcoin::network::constants::Network;
use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
- use lightning::chain::{BestBlock, Confirm, chainmonitor};
+ use lightning::chain::{BestBlock, Confirm, chainmonitor, Filter};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
- use lightning::sign::{InMemorySigner, KeysManager};
+ use lightning::sign::{InMemorySigner, KeysManager, ChangeDestinationSource};
use lightning::chain::transaction::OutPoint;
use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
use lightning::{get_event_msg, get_event};
- use lightning::ln::PaymentHash;
+ use lightning::ln::types::{PaymentHash, ChannelId};
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
use lightning::ln::features::{ChannelFeatures, NodeFeatures};
use lightning::ln::functional_test_utils::*;
use lightning::ln::msgs::{ChannelMessageHandler, Init};
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
- use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
- use lightning::routing::router::{DefaultRouter, Path, RouteHop};
+ use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
+ use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop};
use lightning::util::config::UserConfig;
use lightning::util::ser::Writeable;
use lightning::util::test_utils;
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
+ use lightning::util::sweep::{OutputSweeper, OutputSpendStatus};
use lightning_persister::fs_store::FilesystemStore;
use std::collections::VecDeque;
use std::{fs, env};
Arc<DefaultRouter<
Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
Arc<test_utils::TestLogger>,
+ Arc<KeysManager>,
Arc<LockingWrapper<TestScorer>>,
(),
TestScorer>
logger: Arc<test_utils::TestLogger>,
best_block: BestBlock,
scorer: Arc<LockingWrapper<TestScorer>>,
+ sweeper: Arc<OutputSweeper<Arc<test_utils::TestBroadcaster>, Arc<TestWallet>,
+ Arc<test_utils::TestFeeEstimator>, Arc<dyn Filter + Sync + Send>, Arc<FilesystemStore>,
+ Arc<test_utils::TestLogger>, Arc<KeysManager>>>,
}
impl Node {
impl ScoreLookUp for TestScorer {
type ScoreParams = ();
fn channel_penalty_msat(
- &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams
+ &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams
) -> u64 { unimplemented!(); }
}
impl ScoreUpdate for TestScorer {
- fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
+ fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, short_channel_id } => {
}
}
- fn payment_path_successful(&mut self, actual_path: &Path) {
+ fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, .. } => {
}
}
- fn probe_failed(&mut self, actual_path: &Path, _: u64) {
+ fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, .. } => {
}
}
}
- fn probe_successful(&mut self, actual_path: &Path) {
+ fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, .. } => {
}
}
}
+ fn time_passed(&mut self, _: Duration) {}
}
#[cfg(c_bindings)]
}
}
+ struct TestWallet {}
+
+ impl ChangeDestinationSource for TestWallet {
+ fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
+ Ok(ScriptBuf::new())
+ }
+ }
+
fn get_full_filepath(filepath: String, filename: String) -> String {
let mut path = PathBuf::from(filepath);
path.push(filename);
let genesis_block = genesis_block(network);
let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
+ let now = Duration::from_secs(genesis_block.header.time as u64);
let seed = [i as u8; 32];
- let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default()));
+ let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
+ let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default()));
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
let now = Duration::from_secs(genesis_block.header.time as u64);
let best_block = BestBlock::from_network(network);
let params = ChainParameters { network, best_block };
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
+ let wallet = Arc::new(TestWallet {});
+ let sweeper = Arc::new(OutputSweeper::new(best_block, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator),
+ None::<Arc<dyn Filter + Sync + Send>>, Arc::clone(&keys_manager), wallet, Arc::clone(&kv_store), Arc::clone(&logger)));
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
let msg_handler = MessageHandler {
onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
};
let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
- let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer };
+ let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper };
nodes.push(node);
}
fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
for i in 1..=depth {
- let prev_blockhash = node.best_block.block_hash();
- let height = node.best_block.height() + 1;
+ let prev_blockhash = node.best_block.block_hash;
+ let height = node.best_block.height + 1;
let header = create_dummy_header(prev_blockhash, height);
let txdata = vec![(0, tx)];
node.best_block = BestBlock::new(header.block_hash(), height);
1 => {
node.node.transactions_confirmed(&header, &txdata, height);
node.chain_monitor.transactions_confirmed(&header, &txdata, height);
+ node.sweeper.transactions_confirmed(&header, &txdata, height);
},
x if x == depth => {
+ // We need the TestBroadcaster to know about the new height so that it doesn't think
+ // we're violating the time lock requirements of transactions broadcasted at that
+ // point.
+ node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height));
node.node.best_block_updated(&header, height);
node.chain_monitor.best_block_updated(&header, height);
+ node.sweeper.best_block_updated(&header, height);
},
_ => {},
}
}
}
+
+ fn advance_chain(node: &mut Node, num_blocks: u32) {
+ for i in 1..=num_blocks {
+ let prev_blockhash = node.best_block.block_hash;
+ let height = node.best_block.height + 1;
+ let header = create_dummy_header(prev_blockhash, height);
+ node.best_block = BestBlock::new(header.block_hash(), height);
+ if i == num_blocks {
+ // We need the TestBroadcaster to know about the new height so that it doesn't think
+ // we're violating the time lock requirements of transactions broadcasted at that
+ // point.
+ node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height));
+ node.node.best_block_updated(&header, height);
+ node.chain_monitor.best_block_updated(&header, height);
+ node.sweeper.best_block_updated(&header, height);
+ }
+ }
+ }
+
fn confirm_transaction(node: &mut Node, tx: &Transaction) {
confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
}
}
// Force-close the channel.
- nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
+ nodes[0].node.force_close_broadcasting_latest_txn(&ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }), &nodes[1].node.get_our_node_id()).unwrap();
// Check that the force-close updates are persisted.
check_persisted_data!(nodes[0].node, filepath.clone());
#[test]
fn test_timer_tick_called() {
- // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
- // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
- // `PeerManager::timer_tick_occurred` every `PING_TIMER`.
+ // Test that:
+ // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
+ // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
+ // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
+ // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
+ let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
- log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() {
+ log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
+ log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
break
}
}
tokio::time::sleep(dur).await;
false // Never exit
})
- }, false,
+ }, false, || Some(Duration::ZERO),
);
match bp_future.await {
Ok(_) => panic!("Expected error persisting manager"),
let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
+ let broadcast_funding = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+ assert_eq!(broadcast_funding.txid(), funding_tx.txid());
+ assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
if !std::thread::panicking() {
bg_processor.stop().unwrap();
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
.expect("Events not handled within deadline");
match event {
- Event::SpendableOutputs { .. } => {},
+ Event::SpendableOutputs { outputs, channel_id } => {
+ nodes[0].sweeper.track_spendable_outputs(outputs, channel_id, false, Some(153)).unwrap();
+ },
_ => panic!("Unexpected event: {:?}", event),
}
+ // Check we don't generate an initial sweeping tx until we reach the required height.
+ assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+ let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+ if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
+ assert!(!tracked_output.is_spent_in(&sweep_tx_0));
+ match tracked_output.status {
+ OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
+ assert_eq!(delayed_until_height, Some(153));
+ }
+ _ => panic!("Unexpected status"),
+ }
+ }
+
+ advance_chain(&mut nodes[0], 3);
+
+ // Check we generate an initial sweeping tx.
+ assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+ let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+ let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+ match tracked_output.status {
+ OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
+ assert_eq!(sweep_tx_0.txid(), latest_spending_tx.txid());
+ }
+ _ => panic!("Unexpected status"),
+ }
+
+ // Check we regenerate and rebroadcast the sweeping tx each block.
+ advance_chain(&mut nodes[0], 1);
+ assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+ let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+ let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+ match tracked_output.status {
+ OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
+ assert_eq!(sweep_tx_1.txid(), latest_spending_tx.txid());
+ }
+ _ => panic!("Unexpected status"),
+ }
+ assert_ne!(sweep_tx_0, sweep_tx_1);
+
+ advance_chain(&mut nodes[0], 1);
+ assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+ let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+ let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+ match tracked_output.status {
+ OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
+ assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
+ }
+ _ => panic!("Unexpected status"),
+ }
+ assert_ne!(sweep_tx_0, sweep_tx_2);
+ assert_ne!(sweep_tx_1, sweep_tx_2);
+
+ // Check we still track the spendable outputs up to ANTI_REORG_DELAY confirmations.
+ confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
+ assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+ let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+ match tracked_output.status {
+ OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
+ assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
+ }
+ _ => panic!("Unexpected status"),
+ }
+
+ // Check we still see the transaction as confirmed if we unconfirm any untracked
+ // transaction. (We previously had a bug that would mark tracked transactions as
+ // unconfirmed if any transaction at an unknown block height would be unconfirmed.)
+ let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
+ nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
+
+ assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+ let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+ match tracked_output.status {
+ OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
+ assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
+ }
+ _ => panic!("Unexpected status"),
+ }
+
+ // Check we stop tracking the spendable outputs when one of the txs reaches
+ // ANTI_REORG_DELAY confirmations.
+ confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, ANTI_REORG_DELAY);
+ assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
+
if !std::thread::panicking() {
bg_processor.stop().unwrap();
}
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
- let expected_log = "Persisting scorer".to_string();
+ let expected_log = "Calling time_passed and persisting scorer".to_string();
if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
break
}
_ = exit_receiver.changed() => true,
}
})
- }, false,
+ }, false, || Some(Duration::from_secs(1696300000)),
);
let t1 = tokio::spawn(bp_future);
_ = exit_receiver.changed() => true,
}
})
- }, false,
+ }, false, || Some(Duration::ZERO),
);
let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {