bitcoin = "0.29.0"
lightning = { version = "0.0.110", path = "../lightning", features = ["std"] }
lightning-rapid-gossip-sync = { version = "0.0.110", path = "../lightning-rapid-gossip-sync" }
+futures = { version = "0.3", optional = true }
[dev-dependencies]
lightning = { version = "0.0.110", path = "../lightning", features = ["_test_utils"] }
use std::time::{Duration, Instant};
use std::ops::Deref;
+#[cfg(feature = "futures")]
+use futures::{select, future::FutureExt};
+
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
/// responsibilities are:
}
}
+macro_rules! define_run_body {
+ ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
+ $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
+ $loop_exit_check: expr, $await: expr)
+ => { {
+ let event_handler = DecoratingEventHandler {
+ event_handler: $event_handler,
+ gossip_sync: &$gossip_sync,
+ };
+
+ log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
+ $channel_manager.timer_tick_occurred();
+
+ let mut last_freshness_call = Instant::now();
+ let mut last_ping_call = Instant::now();
+ let mut last_prune_call = Instant::now();
+ let mut last_scorer_persist_call = Instant::now();
+ let mut have_pruned = false;
+
+ loop {
+ $channel_manager.process_pending_events(&event_handler);
+ $chain_monitor.process_pending_events(&event_handler);
+
+ // Note that the PeerManager::process_events may block on ChannelManager's locks,
+ // hence it comes last here. When the ChannelManager finishes whatever it's doing,
+ // we want to ensure we get into `persist_manager` as quickly as we can, especially
+ // without running the normal event processing above and handing events to users.
+ //
+ // Specifically, on an *extremely* slow machine, we may see ChannelManager start
+ // processing a message effectively at any point during this loop. In order to
+ // minimize the time between such processing completing and persisting the updated
+ // ChannelManager, we want to minimize methods blocking on a ChannelManager
+ // generally, and as a fallback place such blocking only immediately before
+ // persistence.
+ $peer_manager.process_events();
+
+ // We wait up to 100ms, but track how long it takes to detect being put to sleep,
+ // see `await_start`'s use below.
+ let await_start = Instant::now();
+ let updates_available = $await;
+ let await_time = await_start.elapsed();
+
+ if updates_available {
+ log_trace!($logger, "Persisting ChannelManager...");
+ $persister.persist_manager(&*$channel_manager)?;
+ log_trace!($logger, "Done persisting ChannelManager.");
+ }
+ // Exit the loop if the background processor was requested to stop.
+ if $loop_exit_check {
+ log_trace!($logger, "Terminating background processor.");
+ break;
+ }
+ if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
+ log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
+ $channel_manager.timer_tick_occurred();
+ last_freshness_call = Instant::now();
+ }
+ if await_time > Duration::from_secs(1) {
+ // On various platforms, we may be starved of CPU cycles for several reasons.
+ // E.g. on iOS, if we've been in the background, we will be entirely paused.
+ // Similarly, if we're on a desktop platform and the device has been asleep, we
+ // may not get any cycles.
+ // We detect this by checking if our max-100ms-sleep, above, ran longer than a
+ // full second, at which point we assume sockets may have been killed (they
+ // appear to be at least on some platforms, even if it has only been a second).
+ // Note that we have to take care to not get here just because user event
+ // processing was slow at the top of the loop. For example, the sample client
+ // may call Bitcoin Core RPCs during event handling, which very often takes
+ // more than a handful of seconds to complete, and shouldn't disconnect all our
+ // peers.
+ log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
+ $peer_manager.disconnect_all_peers();
+ last_ping_call = Instant::now();
+ } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
+ log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
+ $peer_manager.timer_tick_occurred();
+ last_ping_call = Instant::now();
+ }
+
+ // Note that we want to run a graph prune once not long after startup before
+ // falling back to our usual hourly prunes. This avoids short-lived clients never
+ // pruning their network graph. We run once 60 seconds after startup before
+ // continuing our normal cadence.
+ if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
+ // The network graph must not be pruned while rapid sync completion is pending
+ log_trace!($logger, "Assessing prunability of network graph");
+ if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
+ network_graph.remove_stale_channels();
+
+ if let Err(e) = $persister.persist_graph(network_graph) {
+ log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
+ }
+
+ last_prune_call = Instant::now();
+ have_pruned = true;
+ } else {
+ log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
+ }
+ }
+
+ if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
+ if let Some(ref scorer) = $scorer {
+ log_trace!($logger, "Persisting scorer");
+ if let Err(e) = $persister.persist_scorer(&scorer) {
+ log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+ }
+ }
+ last_scorer_persist_call = Instant::now();
+ }
+ }
+
+ // After we exit, ensure we persist the ChannelManager one final time - this avoids
+ // some races where users quit while channel updates were in-flight, with
+ // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
+ $persister.persist_manager(&*$channel_manager)?;
+
+ // Persist Scorer on exit
+ if let Some(ref scorer) = $scorer {
+ $persister.persist_scorer(&scorer)?;
+ }
+
+ // Persist NetworkGraph on exit
+ if let Some(network_graph) = $gossip_sync.network_graph() {
+ $persister.persist_graph(network_graph)?;
+ }
+
+ Ok(())
+ } }
+}
+
+/// Processes background events in a future.
+///
+/// `sleeper` should return a future which completes in the given amount of time and returns a
+/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
+/// future which outputs false, the loop will exit and this function's future will complete.
+///
+/// See [`BackgroundProcessor::start`] for information on which actions this handles.
+#[cfg(feature = "futures")]
+pub async fn process_events_async<
+ 'a,
+ Signer: 'static + Sign,
+ CA: 'static + Deref + Send + Sync,
+ CF: 'static + Deref + Send + Sync,
+ CW: 'static + Deref + Send + Sync,
+ T: 'static + Deref + Send + Sync,
+ K: 'static + Deref + Send + Sync,
+ F: 'static + Deref + Send + Sync,
+ G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
+ L: 'static + Deref + Send + Sync,
+ P: 'static + Deref + Send + Sync,
+ Descriptor: 'static + SocketDescriptor + Send + Sync,
+ CMH: 'static + Deref + Send + Sync,
+ RMH: 'static + Deref + Send + Sync,
+ EH: 'static + EventHandler + Send,
+ PS: 'static + Deref + Send,
+ M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
+ CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
+ PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
+ RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
+ UMH: 'static + Deref + Send + Sync,
+ PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+ S: 'static + Deref<Target = SC> + Send + Sync,
+ SC: WriteableScore<'a>,
+ SleepFuture: core::future::Future<Output = bool>,
+ Sleeper: Fn(Duration) -> SleepFuture
+>(
+ persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
+ gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
+ sleeper: Sleeper,
+) -> Result<(), std::io::Error>
+where
+ CA::Target: 'static + chain::Access,
+ CF::Target: 'static + chain::Filter,
+ CW::Target: 'static + chain::Watch<Signer>,
+ T::Target: 'static + BroadcasterInterface,
+ K::Target: 'static + KeysInterface<Signer = Signer>,
+ F::Target: 'static + FeeEstimator,
+ L::Target: 'static + Logger,
+ P::Target: 'static + Persist<Signer>,
+ CMH::Target: 'static + ChannelMessageHandler,
+ RMH::Target: 'static + RoutingMessageHandler,
+ UMH::Target: 'static + CustomMessageHandler,
+ PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+{
+ let mut should_continue = true;
+ define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+ gossip_sync, peer_manager, logger, scorer, should_continue, {
+ select! {
+ _ = channel_manager.get_persistable_update_future().fuse() => true,
+ cont = sleeper(Duration::from_millis(100)).fuse() => {
+ should_continue = cont;
+ false
+ }
+ }
+ })
+}
+
impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
/// documentation].
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
- let event_handler = DecoratingEventHandler {
- event_handler,
- gossip_sync: &gossip_sync,
- };
-
- log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
- channel_manager.timer_tick_occurred();
-
- let mut last_freshness_call = Instant::now();
- let mut last_ping_call = Instant::now();
- let mut last_prune_call = Instant::now();
- let mut last_scorer_persist_call = Instant::now();
- let mut have_pruned = false;
-
- loop {
- channel_manager.process_pending_events(&event_handler);
- chain_monitor.process_pending_events(&event_handler);
-
- // Note that the PeerManager::process_events may block on ChannelManager's locks,
- // hence it comes last here. When the ChannelManager finishes whatever it's doing,
- // we want to ensure we get into `persist_manager` as quickly as we can, especially
- // without running the normal event processing above and handing events to users.
- //
- // Specifically, on an *extremely* slow machine, we may see ChannelManager start
- // processing a message effectively at any point during this loop. In order to
- // minimize the time between such processing completing and persisting the updated
- // ChannelManager, we want to minimize methods blocking on a ChannelManager
- // generally, and as a fallback place such blocking only immediately before
- // persistence.
- peer_manager.process_events();
-
- // We wait up to 100ms, but track how long it takes to detect being put to sleep,
- // see `await_start`'s use below.
- let await_start = Instant::now();
- let updates_available =
- channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
- let await_time = await_start.elapsed();
-
- if updates_available {
- log_trace!(logger, "Persisting ChannelManager...");
- persister.persist_manager(&*channel_manager)?;
- log_trace!(logger, "Done persisting ChannelManager.");
- }
- // Exit the loop if the background processor was requested to stop.
- if stop_thread.load(Ordering::Acquire) == true {
- log_trace!(logger, "Terminating background processor.");
- break;
- }
- if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
- log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
- channel_manager.timer_tick_occurred();
- last_freshness_call = Instant::now();
- }
- if await_time > Duration::from_secs(1) {
- // On various platforms, we may be starved of CPU cycles for several reasons.
- // E.g. on iOS, if we've been in the background, we will be entirely paused.
- // Similarly, if we're on a desktop platform and the device has been asleep, we
- // may not get any cycles.
- // We detect this by checking if our max-100ms-sleep, above, ran longer than a
- // full second, at which point we assume sockets may have been killed (they
- // appear to be at least on some platforms, even if it has only been a second).
- // Note that we have to take care to not get here just because user event
- // processing was slow at the top of the loop. For example, the sample client
- // may call Bitcoin Core RPCs during event handling, which very often takes
- // more than a handful of seconds to complete, and shouldn't disconnect all our
- // peers.
- log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
- peer_manager.disconnect_all_peers();
- last_ping_call = Instant::now();
- } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
- log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
- peer_manager.timer_tick_occurred();
- last_ping_call = Instant::now();
- }
-
- // Note that we want to run a graph prune once not long after startup before
- // falling back to our usual hourly prunes. This avoids short-lived clients never
- // pruning their network graph. We run once 60 seconds after startup before
- // continuing our normal cadence.
- if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
- // The network graph must not be pruned while rapid sync completion is pending
- log_trace!(logger, "Assessing prunability of network graph");
- if let Some(network_graph) = gossip_sync.prunable_network_graph() {
- network_graph.remove_stale_channels();
-
- if let Err(e) = persister.persist_graph(network_graph) {
- log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
- }
-
- last_prune_call = Instant::now();
- have_pruned = true;
- } else {
- log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
- }
- }
-
- if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
- if let Some(ref scorer) = scorer {
- log_trace!(logger, "Persisting scorer");
- if let Err(e) = persister.persist_scorer(&scorer) {
- log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
- }
- }
- last_scorer_persist_call = Instant::now();
- }
- }
-
- // After we exit, ensure we persist the ChannelManager one final time - this avoids
- // some races where users quit while channel updates were in-flight, with
- // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
- persister.persist_manager(&*channel_manager)?;
-
- // Persist Scorer on exit
- if let Some(ref scorer) = scorer {
- persister.persist_scorer(&scorer)?;
- }
-
- // Persist NetworkGraph on exit
- if let Some(network_graph) = gossip_sync.network_graph() {
- persister.persist_graph(network_graph)?;
- }
-
- Ok(())
+ define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+ gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
+ channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
pub mod chain;
pub mod ln;
pub mod routing;
-#[cfg(fuzzing)]
pub mod onion_message;
-#[cfg(not(fuzzing))]
-#[allow(unused)]
-mod onion_message; // To be exposed after sending/receiving OMs is supported in PeerManager.
#[cfg(feature = "std")]
/// Re-export of either `core2::io` or `std::io`, depending on the `std` feature flag.
use util::config::{UserConfig, ChannelConfig};
use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
use util::{byte_utils, events};
+use util::crypto::sign;
+use util::wakers::{Future, Notifier};
use util::scid_utils::fake_scid;
use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
use util::logger::{Level, Logger};
use core::{cmp, mem};
use core::cell::RefCell;
use io::Read;
-use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
+use sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::time::Duration;
use core::ops::Deref;
-#[cfg(any(test, feature = "std"))]
-use std::time::Instant;
-use util::crypto::sign;
-
// We hold various information about HTLC relay in the HTLC objects in Channel itself:
//
// Upon receipt of an HTLC from a peer, we'll give it a PendingHTLCStatus indicating if it should
/// Taken first everywhere where we are making changes before any other locks.
/// When acquiring this lock in read mode, rather than acquiring it directly, call
/// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
- /// PersistenceNotifier the lock contains sends out a notification when the lock is released.
+ /// Notifier the lock contains sends out a notification when the lock is released.
total_consistency_lock: RwLock<()>,
- persistence_notifier: PersistenceNotifier,
+ persistence_notifier: Notifier,
keys_manager: K,
/// notify or not based on whether relevant changes have been made, providing a closure to
/// `optionally_notify` which returns a `NotifyOption`.
struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
- persistence_notifier: &'a PersistenceNotifier,
+ persistence_notifier: &'a Notifier,
should_persist: F,
// We hold onto this result so the lock doesn't get released immediately.
_read_guard: RwLockReadGuard<'a, ()>,
}
impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused
- fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
+ fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a Notifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
PersistenceNotifierGuard::optionally_notify(lock, notifier, || -> NotifyOption { NotifyOption::DoPersist })
}
- fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
+ fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
let read_guard = lock.read().unwrap();
PersistenceNotifierGuard {
pending_events: Mutex::new(Vec::new()),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
- persistence_notifier: PersistenceNotifier::new(),
+ persistence_notifier: Notifier::new(),
keys_manager,
if were_node_one == msg_from_node_one {
return Ok(NotifyOption::SkipPersist);
} else {
+ log_debug!(self.logger, "Received channel_update for channel {}.", log_bytes!(chan_id));
try_chan_entry!(self, chan.get_mut().channel_update(&msg), channel_state, chan);
}
},
self.persistence_notifier.wait()
}
+ /// Gets a [`Future`] that completes when a persistable update is available. Note that
+ /// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and
+ /// should instead register actions to be taken later.
+ pub fn get_persistable_update_future(&self) -> Future {
+ self.persistence_notifier.get_future()
+ }
+
#[cfg(any(test, feature = "_test_utils"))]
pub fn get_persistence_condvar_value(&self) -> bool {
- let mutcond = &self.persistence_notifier.persistence_lock;
- let &(ref mtx, _) = mutcond;
- let guard = mtx.lock().unwrap();
- *guard
+ self.persistence_notifier.notify_pending()
}
/// Gets the latest best block which was connected either via the [`chain::Listen`] or
}
}
-/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
-/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`.
-struct PersistenceNotifier {
- /// Users won't access the persistence_lock directly, but rather wait on its bool using
- /// `wait_timeout` and `wait`.
- persistence_lock: (Mutex<bool>, Condvar),
-}
-
-impl PersistenceNotifier {
- fn new() -> Self {
- Self {
- persistence_lock: (Mutex::new(false), Condvar::new()),
- }
- }
-
- fn wait(&self) {
- loop {
- let &(ref mtx, ref cvar) = &self.persistence_lock;
- let mut guard = mtx.lock().unwrap();
- if *guard {
- *guard = false;
- return;
- }
- guard = cvar.wait(guard).unwrap();
- let result = *guard;
- if result {
- *guard = false;
- return
- }
- }
- }
-
- #[cfg(any(test, feature = "std"))]
- fn wait_timeout(&self, max_wait: Duration) -> bool {
- let current_time = Instant::now();
- loop {
- let &(ref mtx, ref cvar) = &self.persistence_lock;
- let mut guard = mtx.lock().unwrap();
- if *guard {
- *guard = false;
- return true;
- }
- guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
- // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
- // desired wait time has actually passed, and if not then restart the loop with a reduced wait
- // time. Note that this logic can be highly simplified through the use of
- // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
- // 1.42.0.
- let elapsed = current_time.elapsed();
- let result = *guard;
- if result || elapsed >= max_wait {
- *guard = false;
- return result;
- }
- match max_wait.checked_sub(elapsed) {
- None => return result,
- Some(_) => continue
- }
- }
- }
-
- // Signal to the ChannelManager persister that there are updates necessitating persisting to disk.
- fn notify(&self) {
- let &(ref persist_mtx, ref cnd) = &self.persistence_lock;
- let mut persistence_lock = persist_mtx.lock().unwrap();
- *persistence_lock = true;
- mem::drop(persistence_lock);
- cnd.notify_all();
- }
-}
-
const SERIALIZATION_VERSION: u8 = 1;
const MIN_SERIALIZATION_VERSION: u8 = 1;
pending_events: Mutex::new(pending_events_read),
pending_background_events: Mutex::new(pending_background_events_read),
total_consistency_lock: RwLock::new(()),
- persistence_notifier: PersistenceNotifier::new(),
+ persistence_notifier: Notifier::new(),
keys_manager: args.keys_manager,
logger: args.logger,
use util::test_utils;
use chain::keysinterface::KeysInterface;
- #[cfg(feature = "std")]
- #[test]
- fn test_wait_timeout() {
- use ln::channelmanager::PersistenceNotifier;
- use sync::Arc;
- use core::sync::atomic::AtomicBool;
- use std::thread;
-
- let persistence_notifier = Arc::new(PersistenceNotifier::new());
- let thread_notifier = Arc::clone(&persistence_notifier);
-
- let exit_thread = Arc::new(AtomicBool::new(false));
- let exit_thread_clone = exit_thread.clone();
- thread::spawn(move || {
- loop {
- let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock;
- let mut persistence_lock = persist_mtx.lock().unwrap();
- *persistence_lock = true;
- cnd.notify_all();
-
- if exit_thread_clone.load(Ordering::SeqCst) {
- break
- }
- }
- });
-
- // Check that we can block indefinitely until updates are available.
- let _ = persistence_notifier.wait();
-
- // Check that the PersistenceNotifier will return after the given duration if updates are
- // available.
- loop {
- if persistence_notifier.wait_timeout(Duration::from_millis(100)) {
- break
- }
- }
-
- exit_thread.store(true, Ordering::SeqCst);
-
- // Check that the PersistenceNotifier will return after the given duration even if no updates
- // are available.
- loop {
- if !persistence_notifier.wait_timeout(Duration::from_millis(100)) {
- break
- }
- }
- }
-
#[test]
fn test_notify_limits() {
// Check that a few cases which don't require the persistence of a new ChannelManager,
define_feature!(27, ShutdownAnySegwit, [InitContext, NodeContext],
"Feature flags for `opt_shutdown_anysegwit`.", set_shutdown_any_segwit_optional,
set_shutdown_any_segwit_required, supports_shutdown_anysegwit, requires_shutdown_anysegwit);
+ // We do not yet advertise the onion messages feature bit, but we need to detect when peers
+ // support it.
+ define_feature!(39, OnionMessages, [InitContext, NodeContext],
+ "Feature flags for `option_onion_messages`.", set_onion_messages_optional,
+ set_onion_messages_required, supports_onion_messages, requires_onion_messages);
define_feature!(45, ChannelType, [InitContext, NodeContext],
"Feature flags for `option_channel_type`.", set_channel_type_optional,
set_channel_type_required, supports_channel_type, requires_channel_type);
impl<T: sealed::InitialRoutingSync> Features<T> {
// We are no longer setting initial_routing_sync now that gossip_queries
- // is enabled. This feature is ignored by a peer when gossip_queries has
+ // is enabled. This feature is ignored by a peer when gossip_queries has
// been negotiated.
#[cfg(test)]
pub(crate) fn clear_initial_routing_sync(&mut self) {
pub trait OnionMessageHandler : OnionMessageProvider {
/// Handle an incoming onion_message message from the given peer.
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
+ /// Called when a connection is established with a peer. Can be used to track which peers
+ /// advertise onion message support and are online.
+ fn peer_connected(&self, their_node_id: &PublicKey, init: &Init);
+ /// Indicates a connection to the peer failed/an existing connection was lost. Allows handlers to
+ /// drop and refuse to forward onion messages to this peer.
+ fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool);
}
mod fuzzy_internal_msgs {
}
impl OnionMessageHandler for IgnoringMessageHandler {
fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
+ fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
+ fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
}
impl Deref for IgnoringMessageHandler {
type Target = IgnoringMessageHandler;
}
self.message_handler.route_handler.peer_connected(&their_node_id, &msg);
-
self.message_handler.chan_handler.peer_connected(&their_node_id, &msg);
+ self.message_handler.onion_message_handler.peer_connected(&their_node_id, &msg);
+
peer_lock.their_features = Some(msg.features);
return Ok(None);
} else if peer_lock.their_features.is_none() {
}
descriptor.disconnect_socket();
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, false);
}
}
}
log_pubkey!(node_id), if no_connection_possible { "no " } else { "" });
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, no_connection_possible);
}
}
};
log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id);
peers_lock.remove(&descriptor);
self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, no_connection_possible);
descriptor.disconnect_socket();
}
}
if let Some(node_id) = peer.lock().unwrap().their_node_id {
log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, false);
}
descriptor.disconnect_socket();
}
log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id);
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, false);
}
}
}
//! Onion message testing and test utilities live here.
use chain::keysinterface::{KeysInterface, Recipient};
-use ln::msgs::OnionMessageHandler;
+use ln::features::InitFeatures;
+use ln::msgs::{self, OnionMessageHandler};
use super::{BlindedRoute, Destination, OnionMessenger, SendError};
use util::enforcing_trait_impls::EnforcingSigner;
use util::test_utils;
use bitcoin::network::constants::Network;
-use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
+use bitcoin::secp256k1::{PublicKey, Secp256k1};
use sync::Arc;
}
fn create_nodes(num_messengers: u8) -> Vec<MessengerNode> {
- let mut res = Vec::new();
+ let mut nodes = Vec::new();
for i in 0..num_messengers {
let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
let seed = [i as u8; 32];
let keys_manager = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet));
- res.push(MessengerNode {
+ nodes.push(MessengerNode {
keys_manager: keys_manager.clone(),
messenger: OnionMessenger::new(keys_manager, logger.clone()),
logger,
});
}
- res
+ for idx in 0..num_messengers - 1 {
+ let i = idx as usize;
+ let mut features = InitFeatures::known();
+ features.set_onion_messages_optional();
+ let init_msg = msgs::Init { features, remote_network_address: None };
+ nodes[i].messenger.peer_connected(&nodes[i + 1].get_node_pk(), &init_msg.clone());
+ nodes[i + 1].messenger.peer_connected(&nodes[i].get_node_pk(), &init_msg.clone());
+ }
+ nodes
}
fn pass_along_path(path: &Vec<MessengerNode>, expected_path_id: Option<[u8; 32]>) {
let num_nodes = path.len();
for (idx, node) in path.into_iter().skip(1).enumerate() {
let events = prev_node.messenger.release_pending_msgs();
- assert_eq!(events.len(), 1);
let onion_msg = {
let msgs = events.get(&node.get_node_pk()).unwrap();
assert_eq!(msgs.len(), 1);
#[test]
fn too_big_packet_error() {
// Make sure we error as expected if a packet is too big to send.
- let nodes = create_nodes(1);
-
- let hop_secret = SecretKey::from_slice(&hex::decode("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap();
- let secp_ctx = Secp256k1::new();
- let hop_node_id = PublicKey::from_secret_key(&secp_ctx, &hop_secret);
+ let nodes = create_nodes(2);
+ let hop_node_id = nodes[1].get_node_pk();
let hops = [hop_node_id; 400];
let err = nodes[0].messenger.send_onion_message(&hops, Destination::Node(hop_node_id), None).unwrap_err();
assert_eq!(err, SendError::TooBigPacket);
#[test]
fn invalid_blinded_route_error() {
// Make sure we error as expected if a provided blinded route has 0 or 1 hops.
- let mut nodes = create_nodes(3);
+ let nodes = create_nodes(3);
// 0 hops
let secp_ctx = Secp256k1::new();
#[test]
fn reply_path() {
- let mut nodes = create_nodes(4);
+ let nodes = create_nodes(4);
let secp_ctx = Secp256k1::new();
// Destination::Node
"lightning::onion_message::messenger".to_string(),
format!("Received an onion message with path_id: None and reply_path").to_string(), 2);
}
+
+#[test]
+fn peer_buffer_full() {
+ let nodes = create_nodes(2);
+ for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger
+ nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap();
+ }
+ let err = nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap_err();
+ assert_eq!(err, SendError::BufferFull);
+}
use super::utils;
use util::events::OnionMessageProvider;
use util::logger::Logger;
+use util::ser::Writeable;
-use core::mem;
use core::ops::Deref;
use sync::{Arc, Mutex};
use prelude::*;
///
/// # Example
///
-// Needs to be `ignore` until the `onion_message` module is made public, otherwise this is a test
-// failure.
-/// ```ignore
+/// ```
/// # extern crate bitcoin;
/// # use bitcoin::hashes::_export::_core::time::Duration;
/// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
///
/// // Send an empty onion message to a node id.
/// let intermediate_hops = [hop_node_id1, hop_node_id2];
-/// onion_messenger.send_onion_message(&intermediate_hops, Destination::Node(destination_node_id));
+/// let reply_path = None;
+/// onion_messenger.send_onion_message(&intermediate_hops, Destination::Node(destination_node_id), reply_path);
///
/// // Create a blinded route to yourself, for someone to send an onion message to.
/// # let your_node_id = hop_node_id1;
///
/// // Send an empty onion message to a blinded route.
/// # let intermediate_hops = [hop_node_id1, hop_node_id2];
-/// onion_messenger.send_onion_message(&intermediate_hops, Destination::BlindedRoute(blinded_route));
+/// let reply_path = None;
+/// onion_messenger.send_onion_message(&intermediate_hops, Destination::BlindedRoute(blinded_route), reply_path);
/// ```
///
/// [offers]: <https://github.com/lightning/bolts/pull/798>
/// The provided [`Destination`] was an invalid [`BlindedRoute`], due to having fewer than two
/// blinded hops.
TooFewBlindedHops,
+ /// Our next-hop peer was offline or does not support onion message forwarding.
+ InvalidFirstHop,
+ /// Our next-hop peer's buffer was full or our total outbound buffer was full.
+ BufferFull,
}
impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
.map_err(|e| SendError::Secp256k1(e))?;
let prng_seed = self.keys_manager.get_secure_random_bytes();
- let onion_packet = construct_onion_message_packet(
+ let onion_routing_packet = construct_onion_message_packet(
packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
- let pending_msgs = pending_per_peer_msgs.entry(introduction_node_id).or_insert_with(VecDeque::new);
- pending_msgs.push_back(
- msgs::OnionMessage {
- blinding_point,
- onion_routing_packet: onion_packet,
+ if outbound_buffer_full(&introduction_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) }
+ match pending_per_peer_msgs.entry(introduction_node_id) {
+ hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().push_back(msgs::OnionMessage { blinding_point, onion_routing_packet });
+ Ok(())
}
- );
- Ok(())
+ }
}
#[cfg(test)]
pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<msgs::OnionMessage>> {
let mut pending_msgs = self.pending_messages.lock().unwrap();
let mut msgs = HashMap::new();
- core::mem::swap(&mut *pending_msgs, &mut msgs);
+ // We don't want to disconnect the peers by removing them entirely from the original map, so we
+ // swap the pending message buffers individually.
+ for (peer_node_id, pending_messages) in &mut *pending_msgs {
+ msgs.insert(*peer_node_id, core::mem::take(pending_messages));
+ }
msgs
}
}
+fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, VecDeque<msgs::OnionMessage>>) -> bool {
+ const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128;
+ const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256;
+ let mut total_buffered_bytes = 0;
+ let mut peer_buffered_bytes = 0;
+ for (pk, peer_buf) in buffer {
+ for om in peer_buf {
+ let om_len = om.serialized_length();
+ if pk == peer_node_id {
+ peer_buffered_bytes += om_len;
+ }
+ total_buffered_bytes += om_len;
+
+ if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE ||
+ peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
+ {
+ return true
+ }
+ }
+ }
+ false
+}
+
impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
where K::Target: KeysInterface<Signer = Signer>,
L::Target: Logger,
hop_data: new_packet_bytes,
hmac: next_hop_hmac,
};
-
- let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
- let pending_msgs = pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);
- pending_msgs.push_back(
- msgs::OnionMessage {
- blinding_point: match next_blinding_override {
- Some(blinding_point) => blinding_point,
- None => {
- let blinding_factor = {
- let mut sha = Sha256::engine();
- sha.input(&msg.blinding_point.serialize()[..]);
- sha.input(control_tlvs_ss.as_ref());
- Sha256::from_engine(sha).into_inner()
- };
- let next_blinding_point = msg.blinding_point;
- match next_blinding_point.mul_tweak(&self.secp_ctx, &Scalar::from_be_bytes(blinding_factor).unwrap()) {
- Ok(bp) => bp,
- Err(e) => {
- log_trace!(self.logger, "Failed to compute next blinding point: {}", e);
- return
- }
+ let onion_message = msgs::OnionMessage {
+ blinding_point: match next_blinding_override {
+ Some(blinding_point) => blinding_point,
+ None => {
+ let blinding_factor = {
+ let mut sha = Sha256::engine();
+ sha.input(&msg.blinding_point.serialize()[..]);
+ sha.input(control_tlvs_ss.as_ref());
+ Sha256::from_engine(sha).into_inner()
+ };
+ let next_blinding_point = msg.blinding_point;
+ match next_blinding_point.mul_tweak(&self.secp_ctx, &Scalar::from_be_bytes(blinding_factor).unwrap()) {
+ Ok(bp) => bp,
+ Err(e) => {
+ log_trace!(self.logger, "Failed to compute next blinding point: {}", e);
+ return
}
- },
+ }
},
- onion_routing_packet: outgoing_packet,
},
- );
- log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id);
+ onion_routing_packet: outgoing_packet,
+ };
+
+ let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
+ if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) {
+ log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id);
+ return
+ }
+
+ #[cfg(fuzzing)]
+ pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);
+
+ match pending_per_peer_msgs.entry(next_node_id) {
+ hash_map::Entry::Vacant(_) => {
+ log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id);
+ return
+ },
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().push_back(onion_message);
+ log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id);
+ }
+ };
},
Err(e) => {
log_trace!(self.logger, "Errored decoding onion message packet: {:?}", e);
},
};
}
+
+ fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init) {
+ if init.features.supports_onion_messages() {
+ let mut peers = self.pending_messages.lock().unwrap();
+ peers.insert(their_node_id.clone(), VecDeque::new());
+ }
+ }
+
+ fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) {
+ let mut pending_msgs = self.pending_messages.lock().unwrap();
+ pending_msgs.remove(their_node_id);
+ }
}
impl<Signer: Sign, K: Deref, L: Deref> OnionMessageProvider for OnionMessenger<Signer, K, L>
while read_idx < hop_data_len {
let mut read_buffer = [0; READ_BUFFER_SIZE];
let read_amt = cmp::min(hop_data_len - read_idx, READ_BUFFER_SIZE);
- r.read_exact(&mut read_buffer[..read_amt]);
+ r.read_exact(&mut read_buffer[..read_amt])?;
hop_data.extend_from_slice(&read_buffer[..read_amt]);
read_idx += read_amt;
}
// Uses the provided secret to simultaneously decode and decrypt the control TLVs.
impl ReadableArgs<SharedSecret> for Payload {
- fn read<R: Read>(mut r: &mut R, encrypted_tlvs_ss: SharedSecret) -> Result<Self, DecodeError> {
+ fn read<R: Read>(r: &mut R, encrypted_tlvs_ss: SharedSecret) -> Result<Self, DecodeError> {
let v: BigSize = Readable::read(r)?;
let mut rd = FixedLengthReader::new(r, v.0);
let mut reply_path: Option<BlindedRoute> = None;
pub mod message_signing;
pub mod invoice;
pub mod persist;
+pub mod wakers;
pub(crate) mod atomic_counter;
pub(crate) mod byte_utils;
}
fn duration_since(&self, earlier: Self) -> Duration {
- self.duration_since(earlier)
+ // On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards.
+ // However, we support rust versions prior to 1.60 and some users appear to have "monotonic
+ // clocks" that go backwards in practice (likely relatively ancient kernels/etc). Thus, we
+ // manually check for time going backwards here and return a duration of zero in that case.
+ let now = Self::now();
+ if now > earlier { now - earlier } else { Duration::from_secs(0) }
}
fn duration_since_epoch() -> Duration {
--- /dev/null
+// This file is Copyright its original authors, visible in version control
+// history.
+//
+// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
+// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// You may not use this file except in accordance with one or both of these
+// licenses.
+
+//! Utilities which allow users to block on some future notification from LDK. These are
+//! specifically used by [`ChannelManager`] to allow waiting until the [`ChannelManager`] needs to
+//! be re-persisted.
+//!
+//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
+
+use alloc::sync::Arc;
+use core::mem;
+use core::time::Duration;
+use sync::{Condvar, Mutex};
+
+use prelude::{Box, Vec};
+
+#[cfg(any(test, feature = "std"))]
+use std::time::Instant;
+
+use core::future::Future as StdFuture;
+use core::task::{Context, Poll};
+use core::pin::Pin;
+
+use prelude::*;
+
+/// Used to signal to one of many waiters that the condition they're waiting on has happened.
+pub(crate) struct Notifier {
+ notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
+ condvar: Condvar,
+}
+
+impl Notifier {
+ pub(crate) fn new() -> Self {
+ Self {
+ notify_pending: Mutex::new((false, None)),
+ condvar: Condvar::new(),
+ }
+ }
+
+ pub(crate) fn wait(&self) {
+ loop {
+ let mut guard = self.notify_pending.lock().unwrap();
+ if guard.0 {
+ guard.0 = false;
+ return;
+ }
+ guard = self.condvar.wait(guard).unwrap();
+ let result = guard.0;
+ if result {
+ guard.0 = false;
+ return
+ }
+ }
+ }
+
+ #[cfg(any(test, feature = "std"))]
+ pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
+ let current_time = Instant::now();
+ loop {
+ let mut guard = self.notify_pending.lock().unwrap();
+ if guard.0 {
+ guard.0 = false;
+ return true;
+ }
+ guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0;
+ // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
+ // desired wait time has actually passed, and if not then restart the loop with a reduced wait
+ // time. Note that this logic can be highly simplified through the use of
+ // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
+ // 1.42.0.
+ let elapsed = current_time.elapsed();
+ let result = guard.0;
+ if result || elapsed >= max_wait {
+ guard.0 = false;
+ return result;
+ }
+ match max_wait.checked_sub(elapsed) {
+ None => return result,
+ Some(_) => continue
+ }
+ }
+ }
+
+ /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
+ pub(crate) fn notify(&self) {
+ let mut lock = self.notify_pending.lock().unwrap();
+ lock.0 = true;
+ if let Some(future_state) = lock.1.take() {
+ future_state.lock().unwrap().complete();
+ }
+ mem::drop(lock);
+ self.condvar.notify_all();
+ }
+
+ /// Gets a [`Future`] that will get woken up with any waiters
+ pub(crate) fn get_future(&self) -> Future {
+ let mut lock = self.notify_pending.lock().unwrap();
+ if lock.0 {
+ Future {
+ state: Arc::new(Mutex::new(FutureState {
+ callbacks: Vec::new(),
+ complete: false,
+ }))
+ }
+ } else if let Some(existing_state) = &lock.1 {
+ Future { state: Arc::clone(&existing_state) }
+ } else {
+ let state = Arc::new(Mutex::new(FutureState {
+ callbacks: Vec::new(),
+ complete: false,
+ }));
+ lock.1 = Some(Arc::clone(&state));
+ Future { state }
+ }
+ }
+
+ #[cfg(any(test, feature = "_test_utils"))]
+ pub fn notify_pending(&self) -> bool {
+ self.notify_pending.lock().unwrap().0
+ }
+}
+
+/// A callback which is called when a [`Future`] completes.
+///
+/// Note that this MUST NOT call back into LDK directly, it must instead schedule actions to be
+/// taken later. Rust users should use the [`std::future::Future`] implementation for [`Future`]
+/// instead.
+///
+/// Note that the [`std::future::Future`] implementation may only work for runtimes which schedule
+/// futures when they receive a wake, rather than immediately executing them.
+pub trait FutureCallback : Send {
+ /// The method which is called.
+ fn call(&self);
+}
+
+impl<F: Fn() + Send> FutureCallback for F {
+ fn call(&self) { (self)(); }
+}
+
+pub(crate) struct FutureState {
+ callbacks: Vec<Box<dyn FutureCallback>>,
+ complete: bool,
+}
+
+impl FutureState {
+ fn complete(&mut self) {
+ for callback in self.callbacks.drain(..) {
+ callback.call();
+ }
+ self.complete = true;
+ }
+}
+
+/// A simple future which can complete once, and calls some callback(s) when it does so.
+pub struct Future {
+ state: Arc<Mutex<FutureState>>,
+}
+
+impl Future {
+ /// Registers a callback to be called upon completion of this future. If the future has already
+ /// completed, the callback will be called immediately.
+ pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
+ let mut state = self.state.lock().unwrap();
+ if state.complete {
+ mem::drop(state);
+ callback.call();
+ } else {
+ state.callbacks.push(callback);
+ }
+ }
+}
+
+mod std_future {
+ use core::task::Waker;
+ pub struct StdWaker(pub Waker);
+ impl super::FutureCallback for StdWaker {
+ fn call(&self) { self.0.wake_by_ref() }
+ }
+}
+
+/// (C-not exported) as Rust Futures aren't usable in language bindings.
+impl<'a> StdFuture for Future {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut state = self.state.lock().unwrap();
+ if state.complete {
+ Poll::Ready(())
+ } else {
+ let waker = cx.waker().clone();
+ state.callbacks.push(Box::new(std_future::StdWaker(waker)));
+ Poll::Pending
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use core::sync::atomic::{AtomicBool, Ordering};
+ use core::future::Future as FutureTrait;
+ use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+
+ #[cfg(feature = "std")]
+ #[test]
+ fn test_wait_timeout() {
+ use sync::Arc;
+ use std::thread;
+
+ let persistence_notifier = Arc::new(Notifier::new());
+ let thread_notifier = Arc::clone(&persistence_notifier);
+
+ let exit_thread = Arc::new(AtomicBool::new(false));
+ let exit_thread_clone = exit_thread.clone();
+ thread::spawn(move || {
+ loop {
+ let mut lock = thread_notifier.notify_pending.lock().unwrap();
+ lock.0 = true;
+ thread_notifier.condvar.notify_all();
+
+ if exit_thread_clone.load(Ordering::SeqCst) {
+ break
+ }
+ }
+ });
+
+ // Check that we can block indefinitely until updates are available.
+ let _ = persistence_notifier.wait();
+
+ // Check that the Notifier will return after the given duration if updates are
+ // available.
+ loop {
+ if persistence_notifier.wait_timeout(Duration::from_millis(100)) {
+ break
+ }
+ }
+
+ exit_thread.store(true, Ordering::SeqCst);
+
+ // Check that the Notifier will return after the given duration even if no updates
+ // are available.
+ loop {
+ if !persistence_notifier.wait_timeout(Duration::from_millis(100)) {
+ break
+ }
+ }
+ }
+
+ #[test]
+ fn test_future_callbacks() {
+ let future = Future {
+ state: Arc::new(Mutex::new(FutureState {
+ callbacks: Vec::new(),
+ complete: false,
+ }))
+ };
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+
+ assert!(!callback.load(Ordering::SeqCst));
+ future.state.lock().unwrap().complete();
+ assert!(callback.load(Ordering::SeqCst));
+ future.state.lock().unwrap().complete();
+ }
+
+ #[test]
+ fn test_pre_completed_future_callbacks() {
+ let future = Future {
+ state: Arc::new(Mutex::new(FutureState {
+ callbacks: Vec::new(),
+ complete: false,
+ }))
+ };
+ future.state.lock().unwrap().complete();
+
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+
+ assert!(callback.load(Ordering::SeqCst));
+ assert!(future.state.lock().unwrap().callbacks.is_empty());
+ }
+
+ // Rather annoyingly, there's no safe way in Rust std to construct a Waker despite it being
+ // totally possible to construct from a trait implementation (though somewhat less effecient
+ // compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
+ // waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
+ const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
+ unsafe fn wake_by_ref(ptr: *const ()) { let p = ptr as *const Arc<AtomicBool>; assert!(!(*p).fetch_or(true, Ordering::SeqCst)); }
+ unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc<AtomicBool>; Box::from_raw(p); }
+ unsafe fn wake(ptr: *const ()) { wake_by_ref(ptr); drop(ptr); }
+ unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
+ let p = ptr as *const Arc<AtomicBool>;
+ RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE)
+ }
+
+ fn create_waker() -> (Arc<AtomicBool>, Waker) {
+ let a = Arc::new(AtomicBool::new(false));
+ let waker = unsafe { Waker::from_raw(waker_clone((&a as *const Arc<AtomicBool>) as *const ())) };
+ (a, waker)
+ }
+
+ #[test]
+ fn test_future() {
+ let mut future = Future {
+ state: Arc::new(Mutex::new(FutureState {
+ callbacks: Vec::new(),
+ complete: false,
+ }))
+ };
+ let mut second_future = Future { state: Arc::clone(&future.state) };
+
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+ assert!(!woken.load(Ordering::SeqCst));
+
+ let (second_woken, second_waker) = create_waker();
+ assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
+ assert!(!second_woken.load(Ordering::SeqCst));
+
+ future.state.lock().unwrap().complete();
+ assert!(woken.load(Ordering::SeqCst));
+ assert!(second_woken.load(Ordering::SeqCst));
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+ assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(()));
+ }
+}