pub(crate) struct Selector<
A: Future<Output = ()> + Unpin,
B: Future<Output = ()> + Unpin,
- C: Future<Output = bool> + Unpin,
+ C: Future<Output = ()> + Unpin,
+ D: Future<Output = bool> + Unpin,
> {
pub a: A,
pub b: B,
pub c: C,
+ pub d: D,
}
+
pub(crate) enum SelectorOutput {
A,
B,
- C(bool),
+ C,
+ D(bool),
}
impl<
A: Future<Output = ()> + Unpin,
B: Future<Output = ()> + Unpin,
- C: Future<Output = bool> + Unpin,
- > Future for Selector<A, B, C>
+ C: Future<Output = ()> + Unpin,
+ D: Future<Output = bool> + Unpin,
+ > Future for Selector<A, B, C, D>
{
type Output = SelectorOutput;
fn poll(
Poll::Pending => {},
}
match Pin::new(&mut self.c).poll(ctx) {
+ Poll::Ready(()) => {
+ return Poll::Ready(SelectorOutput::C);
+ },
+ Poll::Pending => {},
+ }
+ match Pin::new(&mut self.d).poll(ctx) {
Poll::Ready(res) => {
- return Poll::Ready(SelectorOutput::C(res));
+ return Poll::Ready(SelectorOutput::D(res));
},
Poll::Pending => {},
}
}
}
+ /// A selector that takes a future wrapped in an option that will be polled if it is `Some` and
+ /// will always be pending otherwise.
+ pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
+ pub optional_future: Option<F>,
+ }
+
+ impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
+ type Output = ();
+ fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
+ match self.optional_future.as_mut() {
+ Some(f) => match Pin::new(f).poll(ctx) {
+ Poll::Ready(()) => {
+ self.optional_future.take();
+ Poll::Ready(())
+ },
+ Poll::Pending => Poll::Pending,
+ },
+ None => Poll::Pending,
+ }
+ }
+ }
+
// If we want to poll a future without an async context to figure out if it has completed or
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
// but sadly there's a good bit of boilerplate here.
#[cfg(feature = "futures")]
use core::task;
#[cfg(feature = "futures")]
-use futures_util::{dummy_waker, Selector, SelectorOutput};
+use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
/// Processes background events in a future.
///
scorer,
should_break,
{
+ let om_fut = if let Some(om) = onion_messenger.as_ref() {
+ let fut = om.get_om().get_update_future();
+ OptionalSelector { optional_future: Some(fut) }
+ } else {
+ OptionalSelector { optional_future: None }
+ };
let fut = Selector {
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
b: chain_monitor.get_update_future(),
- c: sleeper(if mobile_interruptable_platform {
+ c: om_fut,
+ d: sleeper(if mobile_interruptable_platform {
Duration::from_millis(100)
} else {
Duration::from_secs(FASTEST_TIMER)
}),
};
match fut.await {
- SelectorOutput::A | SelectorOutput::B => {},
- SelectorOutput::C(exit) => {
+ SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {},
+ SelectorOutput::D(exit) => {
should_break = exit;
},
}
scorer,
stop_thread.load(Ordering::Acquire),
{
- Sleeper::from_two_futures(
- &channel_manager.get_cm().get_event_or_persistence_needed_future(),
- &chain_monitor.get_update_future(),
- )
- .wait_timeout(Duration::from_millis(100));
+ let sleeper = if let Some(om) = onion_messenger.as_ref() {
+ Sleeper::from_three_futures(
+ &channel_manager.get_cm().get_event_or_persistence_needed_future(),
+ &chain_monitor.get_update_future(),
+ &om.get_om().get_update_future(),
+ )
+ } else {
+ Sleeper::from_two_futures(
+ &channel_manager.get_cm().get_event_or_persistence_needed_future(),
+ &chain_monitor.get_update_future(),
+ )
+ };
+ sleeper.wait_timeout(Duration::from_millis(100));
},
|_| Instant::now(),
|time: &Instant, dur| time.elapsed().as_secs() > dur,
use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
use crate::util::logger::{Logger, WithContext};
use crate::util::ser::Writeable;
+use crate::util::wakers::{Future, Notifier};
use core::fmt;
use core::ops::Deref;
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
pending_peer_connected_events: Mutex<Vec<Event>>,
pending_events_processor: AtomicBool,
+ /// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
+ /// it to give to users.
+ event_notifier: Notifier,
}
/// [`OnionMessage`]s buffered to be sent.
if $res.iter().any(|r| r.is_err()) {
// We failed handling some events. Return to have them eventually replayed.
$self.pending_events_processor.store(false, Ordering::Release);
+ $self.event_notifier.notify();
return;
}
}
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
pending_peer_connected_events: Mutex::new(Vec::new()),
pending_events_processor: AtomicBool::new(false),
+ event_notifier: Notifier::new(),
}
}
Some(addresses) => {
e.insert(OnionMessageRecipient::pending_connection(addresses))
.enqueue_message(onion_message);
+ self.event_notifier.notify();
Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
},
},
return
}
pending_intercepted_msgs_events.push(event);
+ self.event_notifier.notify();
+ }
+
+ /// Gets a [`Future`] that completes when an event is available via
+ /// [`EventsProvider::process_pending_events`] or [`Self::process_pending_events_async`].
+ ///
+ /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
+ /// [`OnionMessenger`] and should instead register actions to be taken later.
+ ///
+ /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
+ pub fn get_update_future(&self) -> Future {
+ self.event_notifier.get_future()
}
/// Processes any events asynchronously using the given handler.
pending_peer_connected_events.push(
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
);
+ self.event_notifier.notify();
}
} else {
self.message_recipients.lock().unwrap().remove(their_node_id);
Self { notifiers: vec![Arc::clone(&future.state)] }
}
/// Constructs a new sleeper from two futures, allowing blocking on both at once.
- // Note that this is the common case - a ChannelManager and ChainMonitor.
pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
}
+ /// Constructs a new sleeper from three futures, allowing blocking on all three at once.
+ ///
+ // Note that this is the common case - a ChannelManager, a ChainMonitor, and an
+ // OnionMessenger.
+ pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self {
+ let notifiers = vec![
+ Arc::clone(&fut_a.state),
+ Arc::clone(&fut_b.state),
+ Arc::clone(&fut_c.state)
+ ];
+ Self { notifiers }
+ }
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
pub fn new(futures: Vec<Future>) -> Self {
Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }