use lightning::events::EventHandler;
#[cfg(feature = "std")]
use lightning::events::EventsProvider;
+#[cfg(feature = "futures")]
+use lightning::events::ReplayEvent;
use lightning::events::{Event, PathFailure};
use lightning::ln::channelmanager::AChannelManager;
/// could setup `process_events_async` like this:
/// ```
/// # use lightning::io;
+/// # use lightning::events::ReplayEvent;
/// # use std::sync::{Arc, RwLock};
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use std::time::SystemTime;
/// # }
/// # struct EventHandler {}
/// # impl EventHandler {
-/// # async fn handle_event(&self, _: lightning::events::Event) {}
+/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
/// # }
/// # #[derive(Eq, PartialEq, Clone, Hash)]
/// # struct SocketDescriptor {}
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
- EventHandlerFuture: core::future::Future<Output = ()>,
+ EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
PS: 'static + Deref + Send,
M: 'static
if update_scorer(scorer, &event, duration_since_epoch) {
log_trace!(logger, "Persisting scorer after update");
if let Err(e) = persister.persist_scorer(&scorer) {
- log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+ log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
+ // We opt not to abort early on persistence failure here as persisting
+ // the scorer is non-critical and we still hope that it will have
+ // resolved itself when it is potentially critical in event handling
+ // below.
}
}
}
}
- event_handler(event).await;
+ event_handler(event).await
})
};
define_run_body!(
}
}
}
- event_handler.handle_event(event);
+ event_handler.handle_event(event)
};
define_run_body!(
persister,
// Initiate the background processors to watch each node.
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
- let event_handler = |_: _| {};
+ let event_handler = |_: _| Ok(());
let bg_processor = BackgroundProcessor::start(
persister,
event_handler,
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
- let event_handler = |_: _| {};
+ let event_handler = |_: _| Ok(());
let bg_processor = BackgroundProcessor::start(
persister,
event_handler,
let persister = Arc::new(
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
);
- let event_handler = |_: _| {};
+ let event_handler = |_: _| Ok(());
let bg_processor = BackgroundProcessor::start(
persister,
event_handler,
let bp_future = super::process_events_async(
persister,
- |_: _| async {},
+ |_: _| async { Ok(()) },
nodes[0].chain_monitor.clone(),
nodes[0].node.clone(),
Some(nodes[0].messenger.clone()),
let data_dir = nodes[0].kv_store.get_data_dir();
let persister =
Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
- let event_handler = |_: _| {};
+ let event_handler = |_: _| Ok(());
let bg_processor = BackgroundProcessor::start(
persister,
event_handler,
let data_dir = nodes[0].kv_store.get_data_dir();
let persister =
Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
- let event_handler = |_: _| {};
+ let event_handler = |_: _| Ok(());
let bg_processor = BackgroundProcessor::start(
persister,
event_handler,
// Set up a background event handler for FundingGenerationReady events.
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
- let event_handler = move |event: Event| match event {
- Event::FundingGenerationReady { .. } => funding_generation_send
- .send(handle_funding_generation_ready!(event, channel_value))
- .unwrap(),
- Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
- Event::ChannelReady { .. } => {},
- _ => panic!("Unexpected event: {:?}", event),
+ let event_handler = move |event: Event| {
+ match event {
+ Event::FundingGenerationReady { .. } => funding_generation_send
+ .send(handle_funding_generation_ready!(event, channel_value))
+ .unwrap(),
+ Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
+ Event::ChannelReady { .. } => {},
+ _ => panic!("Unexpected event: {:?}", event),
+ }
+ Ok(())
};
let bg_processor = BackgroundProcessor::start(
// Set up a background event handler for SpendableOutputs events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
- let event_handler = move |event: Event| match event {
- Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
- Event::ChannelReady { .. } => {},
- Event::ChannelClosed { .. } => {},
- _ => panic!("Unexpected event: {:?}", event),
+ let event_handler = move |event: Event| {
+ match event {
+ Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
+ Event::ChannelReady { .. } => {},
+ Event::ChannelClosed { .. } => {},
+ _ => panic!("Unexpected event: {:?}", event),
+ }
+ Ok(())
};
let persister = Arc::new(Persister::new(data_dir));
let bg_processor = BackgroundProcessor::start(
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
- let event_handler = |_: _| {};
+ let event_handler = |_: _| Ok(());
let bg_processor = BackgroundProcessor::start(
persister,
event_handler,
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
- let event_handler = |_: _| {};
+ let event_handler = |_: _| Ok(());
let background_processor = BackgroundProcessor::start(
persister,
event_handler,
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
let bp_future = super::process_events_async(
persister,
- |_: _| async {},
+ |_: _| async { Ok(()) },
nodes[0].chain_monitor.clone(),
nodes[0].node.clone(),
Some(nodes[0].messenger.clone()),
#[test]
fn test_payment_path_scoring() {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
- let event_handler = move |event: Event| match event {
- Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
- Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
- Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
- Event::ProbeFailed { .. } => sender.send(event).unwrap(),
- _ => panic!("Unexpected event: {:?}", event),
+ let event_handler = move |event: Event| {
+ match event {
+ Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
+ Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
+ Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
+ Event::ProbeFailed { .. } => sender.send(event).unwrap(),
+ _ => panic!("Unexpected event: {:?}", event),
+ }
+ Ok(())
};
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
_ => panic!("Unexpected event: {:?}", event),
}
+ Ok(())
}
};
} else {
other_events.borrow_mut().push(event);
}
+ Ok(())
};
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
use crate::chain::transaction::{OutPoint, TransactionData};
use crate::ln::types::ChannelId;
use crate::sign::ecdsa::EcdsaChannelSigner;
-use crate::events;
-use crate::events::{Event, EventHandler};
+use crate::events::{self, Event, EventHandler, ReplayEvent};
use crate::util::logger::{Logger, WithContext};
use crate::util::errors::APIError;
use crate::util::wakers::{Future, Notifier};
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
use crate::events::EventsProvider;
let events = core::cell::RefCell::new(Vec::new());
- let event_handler = |event: events::Event| events.borrow_mut().push(event);
+ let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
self.process_pending_events(&event_handler);
events.into_inner()
}
/// See the trait-level documentation of [`EventsProvider`] for requirements.
///
/// [`EventsProvider`]: crate::events::EventsProvider
- pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+ pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
&self, handler: H
) {
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
for funding_txo in mons_to_process {
let mut ev;
- super::channelmonitor::process_events_body!(
- self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
+ match super::channelmonitor::process_events_body!(
+ self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await) {
+ Ok(()) => {},
+ Err(ReplayEvent ()) => {
+ self.event_notifier.notify();
+ }
+ }
}
}
/// [`BumpTransaction`]: events::Event::BumpTransaction
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
for monitor_state in self.monitors.read().unwrap().values() {
- monitor_state.monitor.process_pending_events(&handler);
+ match monitor_state.monitor.process_pending_events(&handler) {
+ Ok(()) => {},
+ Err(ReplayEvent ()) => {
+ self.event_notifier.notify();
+ }
+ }
}
}
}
use crate::util::logger::{Logger, Record};
use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
use crate::util::byte_utils;
-use crate::events::{ClosureReason, Event, EventHandler};
+use crate::events::{ClosureReason, Event, EventHandler, ReplayEvent};
use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent};
#[allow(unused_imports)]
macro_rules! _process_events_body {
($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
loop {
+ let mut handling_res = Ok(());
let (pending_events, repeated_events);
if let Some(us) = $self_opt {
let mut inner = us.inner.lock().unwrap();
if inner.is_processing_pending_events {
- break;
+ break handling_res;
}
inner.is_processing_pending_events = true;
pending_events = inner.pending_events.clone();
repeated_events = inner.get_repeated_events();
- } else { break; }
- let num_events = pending_events.len();
+ } else { break handling_res; }
- for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
+ let mut num_handled_events = 0;
+ for event in pending_events {
$event_to_handle = event;
- $handle_event;
+ match $handle_event {
+ Ok(()) => num_handled_events += 1,
+ Err(e) => {
+ // If we encounter an error we stop handling events and make sure to replay
+ // any unhandled events on the next invocation.
+ handling_res = Err(e);
+ break;
+ }
+ }
+ }
+
+ if handling_res.is_ok() {
+ for event in repeated_events {
+ // For repeated events we ignore any errors as they will be replayed eventually
+ // anyways.
+ $event_to_handle = event;
+ let _ = $handle_event;
+ }
}
if let Some(us) = $self_opt {
let mut inner = us.inner.lock().unwrap();
- inner.pending_events.drain(..num_events);
+ inner.pending_events.drain(..num_handled_events);
inner.is_processing_pending_events = false;
- if !inner.pending_events.is_empty() {
- // If there's more events to process, go ahead and do so.
+ if handling_res.is_ok() && !inner.pending_events.is_empty() {
+ // If there's more events to process and we didn't fail so far, go ahead and do
+ // so.
continue;
}
}
- break;
+ break handling_res;
}
}
}
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
/// order to handle these events.
///
+ /// Will return a [`ReplayEvent`] error if event handling failed and should eventually be retried.
+ ///
/// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
/// [`BumpTransaction`]: crate::events::Event::BumpTransaction
- pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
+ pub fn process_pending_events<H: Deref>(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler {
let mut ev;
- process_events_body!(Some(self), ev, handler.handle_event(ev));
+ process_events_body!(Some(self), ev, handler.handle_event(ev))
}
/// Processes any events asynchronously.
///
/// See [`Self::process_pending_events`] for more information.
- pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+ pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
&self, handler: &H
- ) {
+ ) -> Result<(), ReplayEvent> {
let mut ev;
- process_events_body!(Some(self), ev, { handler(ev).await });
+ process_events_body!(Some(self), ev, { handler(ev).await })
}
#[cfg(test)]
///
/// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s
/// and replay any unhandled events on startup. An [`Event`] is considered handled when
-/// [`process_pending_events`] returns, thus handlers MUST fully handle [`Event`]s and persist any
-/// relevant changes to disk *before* returning.
+/// [`process_pending_events`] returns `Ok(())`, thus handlers MUST fully handle [`Event`]s and
+/// persist any relevant changes to disk *before* returning `Ok(())`. In case of an error (e.g.,
+/// persistence failure) implementors should return `Err(ReplayEvent())`, signalling to the
+/// [`EventsProvider`] to replay unhandled events on the next invocation (generally immediately).
+/// Note that some events might not be replayed, please refer to the documentation for
+/// the individual [`Event`] variants for more detail.
///
/// Further, because an application may crash between an [`Event`] being handled and the
/// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
}
+/// An error type that may be returned to LDK in order to safely abort event handling if it can't
+/// currently succeed (e.g., due to a persistence failure).
+///
+/// LDK will ensure the event is persisted and will eventually be replayed.
+#[derive(Clone, Copy, Debug)]
+pub struct ReplayEvent();
+
/// A trait implemented for objects handling events from [`EventsProvider`].
///
/// An async variation also exists for implementations of [`EventsProvider`] that support async
/// event handling. The async event handler should satisfy the generic bounds: `F:
-/// core::future::Future, H: Fn(Event) -> F`.
+/// core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> F`.
pub trait EventHandler {
/// Handles the given [`Event`].
///
/// See [`EventsProvider`] for details that must be considered when implementing this method.
- fn handle_event(&self, event: Event);
+ fn handle_event(&self, event: Event) -> Result<(), ReplayEvent>;
}
-impl<F> EventHandler for F where F: Fn(Event) {
- fn handle_event(&self, event: Event) {
+impl<F> EventHandler for F where F: Fn(Event) -> Result<(), ReplayEvent> {
+ fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> {
self(event)
}
}
impl<T: EventHandler> EventHandler for Arc<T> {
- fn handle_event(&self, event: Event) {
+ fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> {
self.deref().handle_event(event)
}
}
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, WithChannelMonitor, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
use crate::chain::transaction::{OutPoint, TransactionData};
use crate::events;
-use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason};
+use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason, ReplayEvent};
// Since this struct is returned in `list_channels` methods, expose it here in case users want to
// construct one themselves.
use crate::ln::inbound_payment;
/// }
///
/// // On the event processing thread once the peer has responded
-/// channel_manager.process_pending_events(&|event| match event {
-/// Event::FundingGenerationReady {
-/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
-/// user_channel_id, ..
-/// } => {
-/// assert_eq!(user_channel_id, 42);
-/// let funding_transaction = wallet.create_funding_transaction(
-/// channel_value_satoshis, output_script
-/// );
-/// match channel_manager.funding_transaction_generated(
-/// &temporary_channel_id, &counterparty_node_id, funding_transaction
-/// ) {
-/// Ok(()) => println!("Funding channel {}", temporary_channel_id),
-/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
-/// }
-/// },
-/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
-/// assert_eq!(user_channel_id, 42);
-/// println!(
-/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
-/// former_temporary_channel_id.unwrap()
-/// );
-/// },
-/// Event::ChannelReady { channel_id, user_channel_id, .. } => {
-/// assert_eq!(user_channel_id, 42);
-/// println!("Channel {} ready", channel_id);
-/// },
-/// // ...
-/// # _ => {},
+/// channel_manager.process_pending_events(&|event| {
+/// match event {
+/// Event::FundingGenerationReady {
+/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
+/// user_channel_id, ..
+/// } => {
+/// assert_eq!(user_channel_id, 42);
+/// let funding_transaction = wallet.create_funding_transaction(
+/// channel_value_satoshis, output_script
+/// );
+/// match channel_manager.funding_transaction_generated(
+/// &temporary_channel_id, &counterparty_node_id, funding_transaction
+/// ) {
+/// Ok(()) => println!("Funding channel {}", temporary_channel_id),
+/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
+/// }
+/// },
+/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
+/// assert_eq!(user_channel_id, 42);
+/// println!(
+/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
+/// former_temporary_channel_id.unwrap()
+/// );
+/// },
+/// Event::ChannelReady { channel_id, user_channel_id, .. } => {
+/// assert_eq!(user_channel_id, 42);
+/// println!("Channel {} ready", channel_id);
+/// },
+/// // ...
+/// # _ => {},
+/// }
+/// Ok(())
/// });
/// # }
/// ```
/// # fn example<T: AChannelManager>(channel_manager: T) {
/// # let channel_manager = channel_manager.get_cm();
/// # let error_message = "Channel force-closed";
-/// channel_manager.process_pending_events(&|event| match event {
-/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => {
-/// if !is_trusted(counterparty_node_id) {
-/// match channel_manager.force_close_without_broadcasting_txn(
-/// &temporary_channel_id, &counterparty_node_id, error_message.to_string()
-/// ) {
-/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
-/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
+/// channel_manager.process_pending_events(&|event| {
+/// match event {
+/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => {
+/// if !is_trusted(counterparty_node_id) {
+/// match channel_manager.force_close_without_broadcasting_txn(
+/// &temporary_channel_id, &counterparty_node_id, error_message.to_string()
+/// ) {
+/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
+/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
+/// }
+/// return Ok(());
/// }
-/// return;
-/// }
///
-/// let user_channel_id = 43;
-/// match channel_manager.accept_inbound_channel(
-/// &temporary_channel_id, &counterparty_node_id, user_channel_id
-/// ) {
-/// Ok(()) => println!("Accepting channel {}", temporary_channel_id),
-/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
-/// }
-/// },
-/// // ...
-/// # _ => {},
+/// let user_channel_id = 43;
+/// match channel_manager.accept_inbound_channel(
+/// &temporary_channel_id, &counterparty_node_id, user_channel_id
+/// ) {
+/// Ok(()) => println!("Accepting channel {}", temporary_channel_id),
+/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
+/// }
+/// },
+/// // ...
+/// # _ => {},
+/// }
+/// Ok(())
/// });
/// # }
/// ```
/// }
///
/// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-/// Event::ChannelClosed { channel_id, user_channel_id, .. } => {
-/// assert_eq!(user_channel_id, 42);
-/// println!("Channel {} closed", channel_id);
-/// },
-/// // ...
-/// # _ => {},
+/// channel_manager.process_pending_events(&|event| {
+/// match event {
+/// Event::ChannelClosed { channel_id, user_channel_id, .. } => {
+/// assert_eq!(user_channel_id, 42);
+/// println!("Channel {} closed", channel_id);
+/// },
+/// // ...
+/// # _ => {},
+/// }
+/// Ok(())
/// });
/// # }
/// ```
/// };
///
/// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
-/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
-/// assert_eq!(payment_hash, known_payment_hash);
-/// println!("Claiming payment {}", payment_hash);
-/// channel_manager.claim_funds(payment_preimage);
-/// },
-/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
-/// println!("Unknown payment hash: {}", payment_hash);
+/// channel_manager.process_pending_events(&|event| {
+/// match event {
+/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
+/// assert_eq!(payment_hash, known_payment_hash);
+/// println!("Claiming payment {}", payment_hash);
+/// channel_manager.claim_funds(payment_preimage);
+/// },
+/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
+/// println!("Unknown payment hash: {}", payment_hash);
+/// },
+/// PaymentPurpose::SpontaneousPayment(payment_preimage) => {
+/// assert_ne!(payment_hash, known_payment_hash);
+/// println!("Claiming spontaneous payment {}", payment_hash);
+/// channel_manager.claim_funds(payment_preimage);
+/// },
+/// // ...
+/// # _ => {},
/// },
-/// PaymentPurpose::SpontaneousPayment(payment_preimage) => {
-/// assert_ne!(payment_hash, known_payment_hash);
-/// println!("Claiming spontaneous payment {}", payment_hash);
-/// channel_manager.claim_funds(payment_preimage);
+/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+/// assert_eq!(payment_hash, known_payment_hash);
+/// println!("Claimed {} msats", amount_msat);
/// },
/// // ...
-/// # _ => {},
-/// },
-/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
-/// assert_eq!(payment_hash, known_payment_hash);
-/// println!("Claimed {} msats", amount_msat);
-/// },
-/// // ...
-/// # _ => {},
+/// # _ => {},
+/// }
+/// Ok(())
/// });
/// # }
/// ```
/// );
///
/// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
-/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
-/// // ...
-/// # _ => {},
+/// channel_manager.process_pending_events(&|event| {
+/// match event {
+/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
+/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
+/// // ...
+/// # _ => {},
+/// }
+/// Ok(())
/// });
/// # }
/// ```
/// let bech32_offer = offer.to_string();
///
/// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
-/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => {
-/// println!("Claiming payment {}", payment_hash);
-/// channel_manager.claim_funds(payment_preimage);
+/// channel_manager.process_pending_events(&|event| {
+/// match event {
+/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => {
+/// println!("Claiming payment {}", payment_hash);
+/// channel_manager.claim_funds(payment_preimage);
+/// },
+/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => {
+/// println!("Unknown payment hash: {}", payment_hash);
+/// }
+/// # _ => {},
/// },
-/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => {
-/// println!("Unknown payment hash: {}", payment_hash);
+/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+/// println!("Claimed {} msats", amount_msat);
/// },
/// // ...
-/// # _ => {},
-/// },
-/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
-/// println!("Claimed {} msats", amount_msat);
-/// },
-/// // ...
-/// # _ => {},
+/// # _ => {},
+/// }
+/// Ok(())
/// });
/// # Ok(())
/// # }
/// );
///
/// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
-/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
-/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
-/// // ...
-/// # _ => {},
+/// channel_manager.process_pending_events(&|event| {
+/// match event {
+/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+/// // ...
+/// # _ => {},
+/// }
+/// Ok(())
/// });
/// # }
/// ```
/// );
///
/// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
-/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
-/// // ...
-/// # _ => {},
+/// channel_manager.process_pending_events(&|event| {
+/// match event {
+/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+/// // ...
+/// # _ => {},
+/// }
+/// Ok(())
/// });
/// # Ok(())
/// # }
/// };
///
/// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
-/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => {
-/// assert_eq!(payment_hash, known_payment_hash);
-/// println!("Claiming payment {}", payment_hash);
-/// channel_manager.claim_funds(payment_preimage);
-/// },
-/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => {
-/// println!("Unknown payment hash: {}", payment_hash);
-/// },
-/// // ...
-/// # _ => {},
+/// channel_manager.process_pending_events(&|event| {
+/// match event {
+/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => {
+/// assert_eq!(payment_hash, known_payment_hash);
+/// println!("Claiming payment {}", payment_hash);
+/// channel_manager.claim_funds(payment_preimage);
+/// },
+/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => {
+/// println!("Unknown payment hash: {}", payment_hash);
+/// },
+/// // ...
+/// # _ => {},
/// },
/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
/// assert_eq!(payment_hash, known_payment_hash);
/// },
/// // ...
/// # _ => {},
+/// }
+/// Ok(())
/// });
/// # }
/// ```
macro_rules! process_events_body {
($self: expr, $event_to_handle: expr, $handle_event: expr) => {
+ let mut handling_failed = false;
let mut processed_all_events = false;
- while !processed_all_events {
+ while !handling_failed && !processed_all_events {
if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
return;
}
}
let pending_events = $self.pending_events.lock().unwrap().clone();
- let num_events = pending_events.len();
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}
let mut post_event_actions = Vec::new();
+ let mut num_handled_events = 0;
for (event, action_opt) in pending_events {
$event_to_handle = event;
- $handle_event;
- if let Some(action) = action_opt {
- post_event_actions.push(action);
+ match $handle_event {
+ Ok(()) => {
+ if let Some(action) = action_opt {
+ post_event_actions.push(action);
+ }
+ num_handled_events += 1;
+ }
+ Err(_e) => {
+ // If we encounter an error we stop handling events and make sure to replay
+ // any unhandled events on the next invocation.
+ handling_failed = true;
+ break;
+ }
}
}
{
let mut pending_events = $self.pending_events.lock().unwrap();
- pending_events.drain(..num_events);
+ pending_events.drain(..num_handled_events);
processed_all_events = pending_events.is_empty();
// Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
// updated here with the `pending_events` lock acquired.
#[cfg(any(test, feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
let events = core::cell::RefCell::new(Vec::new());
- let event_handler = |event: events::Event| events.borrow_mut().push(event);
+ let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
self.process_pending_events(&event_handler);
events.into_inner()
}
/// using the given event handler.
///
/// See the trait-level documentation of [`EventsProvider`] for requirements.
- pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+ pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
&self, handler: H
) {
let mut ev;
fn release_events(node: &MessengerNode) -> Vec<Event> {
let events = core::cell::RefCell::new(Vec::new());
- node.messenger.process_pending_events(&|e| events.borrow_mut().push(e));
+ node.messenger.process_pending_events(&|e| Ok(events.borrow_mut().push(e)));
events.into_inner()
}
use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, MessageContext, OffersContext, ReceiveTlvs};
use crate::blinded_path::utils;
-use crate::events::{Event, EventHandler, EventsProvider};
+use crate::events::{Event, EventHandler, EventsProvider, ReplayEvent};
use crate::sign::{EntropySource, NodeSigner, Recipient};
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress};
use super::packet::ParsedOnionMessageContents;
use super::offers::OffersMessageHandler;
use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
+use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
use crate::util::logger::{Logger, WithContext};
use crate::util::ser::Writeable;
/// have an ordering requirement.
///
/// See the trait-level documentation of [`EventsProvider`] for requirements.
- pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
+ pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
&self, handler: H
) {
let mut intercepted_msgs = Vec::new();
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
- futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
+ let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
+ futures.push(future);
}
}
}
for ev in intercepted_msgs {
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
- futures.push(Some(handler(ev)));
+ let future = ResultFuture::Pending(handler(ev));
+ futures.push(future);
}
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
- crate::util::async_poll::MultiFuturePoller(futures).await;
+ MultiResultFuturePoller::new(futures).await;
if peer_connecteds.len() <= 1 {
for event in peer_connecteds { handler(event).await; }
} else {
let mut futures = Vec::new();
for event in peer_connecteds {
- futures.push(Some(handler(event)));
+ let future = ResultFuture::Pending(handler(event));
+ futures.push(future);
}
- crate::util::async_poll::MultiFuturePoller(futures).await;
+ MultiResultFuturePoller::new(futures).await;
}
}
}
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
- handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
+ let _ = handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
}
}
}
use core::pin::Pin;
use core::task::{Context, Poll};
-pub(crate) struct MultiFuturePoller<F: Future<Output = ()> + Unpin>(pub Vec<Option<F>>);
+pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> {
+ Pending(F),
+ Ready(Result<(), E>),
+}
+
+pub(crate) struct MultiResultFuturePoller<
+ F: Future<Output = Result<(), E>> + Unpin,
+ E: Copy + Unpin,
+> {
+ futures_state: Vec<ResultFuture<F, E>>,
+}
-impl<F: Future<Output = ()> + Unpin> Future for MultiFuturePoller<F> {
- type Output = ();
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> MultiResultFuturePoller<F, E> {
+ pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
+ Self { futures_state }
+ }
+}
+
+impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> Future
+ for MultiResultFuturePoller<F, E>
+{
+ type Output = Vec<Result<(), E>>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
let mut have_pending_futures = false;
- for fut_option in self.get_mut().0.iter_mut() {
- let mut fut = match fut_option.take() {
- None => continue,
- Some(fut) => fut,
- };
- match Pin::new(&mut fut).poll(cx) {
- Poll::Ready(()) => {},
- Poll::Pending => {
- have_pending_futures = true;
- *fut_option = Some(fut);
+ let futures_state = &mut self.get_mut().futures_state;
+ for state in futures_state.iter_mut() {
+ match state {
+ ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {
+ Poll::Ready(res) => {
+ *state = ResultFuture::Ready(res);
+ },
+ Poll::Pending => {
+ have_pending_futures = true;
+ },
},
+ ResultFuture::Ready(_) => continue,
}
}
+
if have_pending_futures {
Poll::Pending
} else {
- Poll::Ready(())
+ let results = futures_state
+ .drain(..)
+ .filter_map(|e| match e {
+ ResultFuture::Ready(res) => Some(res),
+ ResultFuture::Pending(_) => {
+ debug_assert!(
+ false,
+ "All futures are expected to be ready if none are pending"
+ );
+ None
+ },
+ })
+ .collect();
+ Poll::Ready(results)
}
}
}