]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Make event handling fallible
authorElias Rohrer <dev@tnull.de>
Mon, 15 Apr 2024 08:35:52 +0000 (10:35 +0200)
committerElias Rohrer <dev@tnull.de>
Thu, 18 Jul 2024 13:54:21 +0000 (15:54 +0200)
Previously, we would require our users to handle all events
successfully inline or panic will trying to do so. If they would exit
the `EventHandler` any other way we'd forget about the event and
wouldn't replay them after restart.

Here, we implement fallible event handling, allowing the user to return
`Err(())` which signals to our event providers they should abort event
processing and replay any unhandled events later (i.e., in the next
invocation).

lightning-background-processor/src/lib.rs
lightning-invoice/src/utils.rs
lightning/src/chain/chainmonitor.rs
lightning/src/chain/channelmonitor.rs
lightning/src/events/mod.rs
lightning/src/ln/channelmanager.rs
lightning/src/onion_message/functional_tests.rs
lightning/src/onion_message/messenger.rs
lightning/src/util/async_poll.rs

index 940d1b029e730045d25afd775a6cfe17461f27b6..10f5ada505e87aa899a7fe37ec01788ef029d2c0 100644 (file)
@@ -26,6 +26,8 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist};
 use lightning::events::EventHandler;
 #[cfg(feature = "std")]
 use lightning::events::EventsProvider;
+#[cfg(feature = "futures")]
+use lightning::events::ReplayEvent;
 use lightning::events::{Event, PathFailure};
 
 use lightning::ln::channelmanager::AChannelManager;
@@ -583,6 +585,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
 /// could setup `process_events_async` like this:
 /// ```
 /// # use lightning::io;
+/// # use lightning::events::ReplayEvent;
 /// # use std::sync::{Arc, RwLock};
 /// # use std::sync::atomic::{AtomicBool, Ordering};
 /// # use std::time::SystemTime;
@@ -600,7 +603,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
 /// # }
 /// # struct EventHandler {}
 /// # impl EventHandler {
-/// #     async fn handle_event(&self, _: lightning::events::Event) {}
+/// #     async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
 /// # }
 /// # #[derive(Eq, PartialEq, Clone, Hash)]
 /// # struct SocketDescriptor {}
@@ -698,7 +701,7 @@ pub async fn process_events_async<
        G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
        L: 'static + Deref + Send + Sync,
        P: 'static + Deref + Send + Sync,
-       EventHandlerFuture: core::future::Future<Output = ()>,
+       EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
        EventHandler: Fn(Event) -> EventHandlerFuture,
        PS: 'static + Deref + Send,
        M: 'static
@@ -751,12 +754,16 @@ where
                                        if update_scorer(scorer, &event, duration_since_epoch) {
                                                log_trace!(logger, "Persisting scorer after update");
                                                if let Err(e) = persister.persist_scorer(&scorer) {
-                                                       log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                                                       log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
+                                                       // We opt not to abort early on persistence failure here as persisting
+                                                       // the scorer is non-critical and we still hope that it will have
+                                                       // resolved itself when it is potentially critical in event handling
+                                                       // below.
                                                }
                                        }
                                }
                        }
-                       event_handler(event).await;
+                       event_handler(event).await
                })
        };
        define_run_body!(
@@ -913,7 +920,7 @@ impl BackgroundProcessor {
                                                }
                                        }
                                }
-                               event_handler.handle_event(event);
+                               event_handler.handle_event(event)
                        };
                        define_run_body!(
                                persister,
@@ -1757,7 +1764,7 @@ mod tests {
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let event_handler = |_: _| {};
+               let event_handler = |_: _| Ok(());
                let bg_processor = BackgroundProcessor::start(
                        persister,
                        event_handler,
@@ -1847,7 +1854,7 @@ mod tests {
                let (_, nodes) = create_nodes(1, "test_timer_tick_called");
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let event_handler = |_: _| {};
+               let event_handler = |_: _| Ok(());
                let bg_processor = BackgroundProcessor::start(
                        persister,
                        event_handler,
@@ -1889,7 +1896,7 @@ mod tests {
                let persister = Arc::new(
                        Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
                );
-               let event_handler = |_: _| {};
+               let event_handler = |_: _| Ok(());
                let bg_processor = BackgroundProcessor::start(
                        persister,
                        event_handler,
@@ -1924,7 +1931,7 @@ mod tests {
 
                let bp_future = super::process_events_async(
                        persister,
-                       |_: _| async {},
+                       |_: _| async { Ok(()) },
                        nodes[0].chain_monitor.clone(),
                        nodes[0].node.clone(),
                        Some(nodes[0].messenger.clone()),
@@ -1957,7 +1964,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister =
                        Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
-               let event_handler = |_: _| {};
+               let event_handler = |_: _| Ok(());
                let bg_processor = BackgroundProcessor::start(
                        persister,
                        event_handler,
@@ -1986,7 +1993,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister =
                        Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
-               let event_handler = |_: _| {};
+               let event_handler = |_: _| Ok(());
                let bg_processor = BackgroundProcessor::start(
                        persister,
                        event_handler,
@@ -2021,13 +2028,16 @@ mod tests {
                // Set up a background event handler for FundingGenerationReady events.
                let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
                let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
-               let event_handler = move |event: Event| match event {
-                       Event::FundingGenerationReady { .. } => funding_generation_send
-                               .send(handle_funding_generation_ready!(event, channel_value))
-                               .unwrap(),
-                       Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
-                       Event::ChannelReady { .. } => {},
-                       _ => panic!("Unexpected event: {:?}", event),
+               let event_handler = move |event: Event| {
+                       match event {
+                               Event::FundingGenerationReady { .. } => funding_generation_send
+                                       .send(handle_funding_generation_ready!(event, channel_value))
+                                       .unwrap(),
+                               Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
+                               Event::ChannelReady { .. } => {},
+                               _ => panic!("Unexpected event: {:?}", event),
+                       }
+                       Ok(())
                };
 
                let bg_processor = BackgroundProcessor::start(
@@ -2082,11 +2092,14 @@ mod tests {
 
                // Set up a background event handler for SpendableOutputs events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
-               let event_handler = move |event: Event| match event {
-                       Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
-                       Event::ChannelReady { .. } => {},
-                       Event::ChannelClosed { .. } => {},
-                       _ => panic!("Unexpected event: {:?}", event),
+               let event_handler = move |event: Event| {
+                       match event {
+                               Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
+                               Event::ChannelReady { .. } => {},
+                               Event::ChannelClosed { .. } => {},
+                               _ => panic!("Unexpected event: {:?}", event),
+                       }
+                       Ok(())
                };
                let persister = Arc::new(Persister::new(data_dir));
                let bg_processor = BackgroundProcessor::start(
@@ -2220,7 +2233,7 @@ mod tests {
                let (_, nodes) = create_nodes(2, "test_scorer_persistence");
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let event_handler = |_: _| {};
+               let event_handler = |_: _| Ok(());
                let bg_processor = BackgroundProcessor::start(
                        persister,
                        event_handler,
@@ -2315,7 +2328,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
 
-               let event_handler = |_: _| {};
+               let event_handler = |_: _| Ok(());
                let background_processor = BackgroundProcessor::start(
                        persister,
                        event_handler,
@@ -2350,7 +2363,7 @@ mod tests {
                let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
                let bp_future = super::process_events_async(
                        persister,
-                       |_: _| async {},
+                       |_: _| async { Ok(()) },
                        nodes[0].chain_monitor.clone(),
                        nodes[0].node.clone(),
                        Some(nodes[0].messenger.clone()),
@@ -2492,12 +2505,15 @@ mod tests {
        #[test]
        fn test_payment_path_scoring() {
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
-               let event_handler = move |event: Event| match event {
-                       Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
-                       Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
-                       Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
-                       Event::ProbeFailed { .. } => sender.send(event).unwrap(),
-                       _ => panic!("Unexpected event: {:?}", event),
+               let event_handler = move |event: Event| {
+                       match event {
+                               Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
+                               Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
+                               Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
+                               Event::ProbeFailed { .. } => sender.send(event).unwrap(),
+                               _ => panic!("Unexpected event: {:?}", event),
+                       }
+                       Ok(())
                };
 
                let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
@@ -2543,6 +2559,7 @@ mod tests {
                                        Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
                                        _ => panic!("Unexpected event: {:?}", event),
                                }
+                               Ok(())
                        }
                };
 
index 00b49c371eacabd7b65b5cb6a34e0e74979b8379..fa301a8dc0629de7aa69bbf313154de48a43a74f 100644 (file)
@@ -1391,6 +1391,7 @@ mod test {
                        } else {
                                other_events.borrow_mut().push(event);
                        }
+                       Ok(())
                };
                nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
                nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
index e6bb9d90778ce46b4cd7ba5f25eafb52daba455d..93e1dae6ce3635716b20e97f48f5a682d84f37b9 100644 (file)
@@ -33,8 +33,7 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance
 use crate::chain::transaction::{OutPoint, TransactionData};
 use crate::ln::types::ChannelId;
 use crate::sign::ecdsa::EcdsaChannelSigner;
-use crate::events;
-use crate::events::{Event, EventHandler};
+use crate::events::{self, Event, EventHandler, ReplayEvent};
 use crate::util::logger::{Logger, WithContext};
 use crate::util::errors::APIError;
 use crate::util::wakers::{Future, Notifier};
@@ -533,7 +532,7 @@ where C::Target: chain::Filter,
        pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
                use crate::events::EventsProvider;
                let events = core::cell::RefCell::new(Vec::new());
-               let event_handler = |event: events::Event| events.borrow_mut().push(event);
+               let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
@@ -544,7 +543,7 @@ where C::Target: chain::Filter,
        /// See the trait-level documentation of [`EventsProvider`] for requirements.
        ///
        /// [`EventsProvider`]: crate::events::EventsProvider
-       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+       pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
                &self, handler: H
        ) {
                // Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
@@ -552,8 +551,13 @@ where C::Target: chain::Filter,
                let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
                for funding_txo in mons_to_process {
                        let mut ev;
-                       super::channelmonitor::process_events_body!(
-                               self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
+                       match super::channelmonitor::process_events_body!(
+                               self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await) {
+                               Ok(()) => {},
+                               Err(ReplayEvent ()) => {
+                                       self.event_notifier.notify();
+                               }
+                       }
                }
        }
 
@@ -880,7 +884,12 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
        /// [`BumpTransaction`]: events::Event::BumpTransaction
        fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
                for monitor_state in self.monitors.read().unwrap().values() {
-                       monitor_state.monitor.process_pending_events(&handler);
+                       match monitor_state.monitor.process_pending_events(&handler) {
+                               Ok(()) => {},
+                               Err(ReplayEvent ()) => {
+                                       self.event_notifier.notify();
+                               }
+                       }
                }
        }
 }
index 13f2ff044a2d412968bd7e48a5ae256fc70dda16..5ecea8251004f5f2f1413f0f65033034d5c3fd6d 100644 (file)
@@ -51,7 +51,7 @@ use crate::chain::Filter;
 use crate::util::logger::{Logger, Record};
 use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
 use crate::util::byte_utils;
-use crate::events::{ClosureReason, Event, EventHandler};
+use crate::events::{ClosureReason, Event, EventHandler, ReplayEvent};
 use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent};
 
 #[allow(unused_imports)]
@@ -1159,34 +1159,53 @@ impl<Signer: EcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signer> {
 macro_rules! _process_events_body {
        ($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
                loop {
+                       let mut handling_res = Ok(());
                        let (pending_events, repeated_events);
                        if let Some(us) = $self_opt {
                                let mut inner = us.inner.lock().unwrap();
                                if inner.is_processing_pending_events {
-                                       break;
+                                       break handling_res;
                                }
                                inner.is_processing_pending_events = true;
 
                                pending_events = inner.pending_events.clone();
                                repeated_events = inner.get_repeated_events();
-                       } else { break; }
-                       let num_events = pending_events.len();
+                       } else { break handling_res; }
 
-                       for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
+                       let mut num_handled_events = 0;
+                       for event in pending_events {
                                $event_to_handle = event;
-                               $handle_event;
+                               match $handle_event {
+                                       Ok(()) => num_handled_events += 1,
+                                       Err(e) => {
+                                               // If we encounter an error we stop handling events and make sure to replay
+                                               // any unhandled events on the next invocation.
+                                               handling_res = Err(e);
+                                               break;
+                                       }
+                               }
+                       }
+
+                       if handling_res.is_ok() {
+                               for event in repeated_events {
+                                       // For repeated events we ignore any errors as they will be replayed eventually
+                                       // anyways.
+                                       $event_to_handle = event;
+                                       let _ = $handle_event;
+                               }
                        }
 
                        if let Some(us) = $self_opt {
                                let mut inner = us.inner.lock().unwrap();
-                               inner.pending_events.drain(..num_events);
+                               inner.pending_events.drain(..num_handled_events);
                                inner.is_processing_pending_events = false;
-                               if !inner.pending_events.is_empty() {
-                                       // If there's more events to process, go ahead and do so.
+                               if handling_res.is_ok() && !inner.pending_events.is_empty() {
+                                       // If there's more events to process and we didn't fail so far, go ahead and do
+                                       // so.
                                        continue;
                                }
                        }
-                       break;
+                       break handling_res;
                }
        }
 }
@@ -1498,21 +1517,23 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
        /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
        /// order to handle these events.
        ///
+       /// Will return a [`ReplayEvent`] error if event handling failed and should eventually be retried.
+       ///
        /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
        /// [`BumpTransaction`]: crate::events::Event::BumpTransaction
-       pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
+       pub fn process_pending_events<H: Deref>(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler {
                let mut ev;
-               process_events_body!(Some(self), ev, handler.handle_event(ev));
+               process_events_body!(Some(self), ev, handler.handle_event(ev))
        }
 
        /// Processes any events asynchronously.
        ///
        /// See [`Self::process_pending_events`] for more information.
-       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+       pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
                &self, handler: &H
-       ) {
+       ) -> Result<(), ReplayEvent> {
                let mut ev;
-               process_events_body!(Some(self), ev, { handler(ev).await });
+               process_events_body!(Some(self), ev, { handler(ev).await })
        }
 
        #[cfg(test)]
index acee931138f13bedd00c9a7c9a43f33c5bbcd6c8..e51a0972cdcdfb67c7df06da7e99ebbeb9148590 100644 (file)
@@ -2300,8 +2300,12 @@ pub trait MessageSendEventsProvider {
 ///
 /// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s
 /// and replay any unhandled events on startup. An [`Event`] is considered handled when
-/// [`process_pending_events`] returns, thus handlers MUST fully handle [`Event`]s and persist any
-/// relevant changes to disk *before* returning.
+/// [`process_pending_events`] returns `Ok(())`, thus handlers MUST fully handle [`Event`]s and
+/// persist any relevant changes to disk *before* returning `Ok(())`. In case of an error (e.g.,
+/// persistence failure) implementors should return `Err(ReplayEvent())`, signalling to the
+/// [`EventsProvider`] to replay unhandled events on the next invocation (generally immediately).
+/// Note that some events might not be replayed, please refer to the documentation for
+/// the individual [`Event`] variants for more detail.
 ///
 /// Further, because an application may crash between an [`Event`] being handled and the
 /// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in
@@ -2328,26 +2332,33 @@ pub trait EventsProvider {
        fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
 }
 
+/// An error type that may be returned to LDK in order to safely abort event handling if it can't
+/// currently succeed (e.g., due to a persistence failure).
+///
+/// LDK will ensure the event is persisted and will eventually be replayed.
+#[derive(Clone, Copy, Debug)]
+pub struct ReplayEvent();
+
 /// A trait implemented for objects handling events from [`EventsProvider`].
 ///
 /// An async variation also exists for implementations of [`EventsProvider`] that support async
 /// event handling. The async event handler should satisfy the generic bounds: `F:
-/// core::future::Future, H: Fn(Event) -> F`.
+/// core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> F`.
 pub trait EventHandler {
        /// Handles the given [`Event`].
        ///
        /// See [`EventsProvider`] for details that must be considered when implementing this method.
-       fn handle_event(&self, event: Event);
+       fn handle_event(&self, event: Event) -> Result<(), ReplayEvent>;
 }
 
-impl<F> EventHandler for F where F: Fn(Event) {
-       fn handle_event(&self, event: Event) {
+impl<F> EventHandler for F where F: Fn(Event) -> Result<(), ReplayEvent> {
+       fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> {
                self(event)
        }
 }
 
 impl<T: EventHandler> EventHandler for Arc<T> {
-       fn handle_event(&self, event: Event) {
+       fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> {
                self.deref().handle_event(event)
        }
 }
index b14b6e608777529c55f44b7feef6184757b0e18e..561053d85b4158b0c5d61546b8a1428ea18cce31 100644 (file)
@@ -41,7 +41,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, Fee
 use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, WithChannelMonitor, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
 use crate::chain::transaction::{OutPoint, TransactionData};
 use crate::events;
-use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason};
+use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason, ReplayEvent};
 // Since this struct is returned in `list_channels` methods, expose it here in case users want to
 // construct one themselves.
 use crate::ln::inbound_payment;
@@ -1395,35 +1395,38 @@ where
 /// }
 ///
 /// // On the event processing thread once the peer has responded
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::FundingGenerationReady {
-///         temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
-///         user_channel_id, ..
-///     } => {
-///         assert_eq!(user_channel_id, 42);
-///         let funding_transaction = wallet.create_funding_transaction(
-///             channel_value_satoshis, output_script
-///         );
-///         match channel_manager.funding_transaction_generated(
-///             &temporary_channel_id, &counterparty_node_id, funding_transaction
-///         ) {
-///             Ok(()) => println!("Funding channel {}", temporary_channel_id),
-///             Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
-///         }
-///     },
-///     Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
-///         assert_eq!(user_channel_id, 42);
-///         println!(
-///             "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
-///             former_temporary_channel_id.unwrap()
-///         );
-///     },
-///     Event::ChannelReady { channel_id, user_channel_id, .. } => {
-///         assert_eq!(user_channel_id, 42);
-///         println!("Channel {} ready", channel_id);
-///     },
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::FundingGenerationReady {
+///             temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
+///             user_channel_id, ..
+///         } => {
+///             assert_eq!(user_channel_id, 42);
+///             let funding_transaction = wallet.create_funding_transaction(
+///                 channel_value_satoshis, output_script
+///             );
+///             match channel_manager.funding_transaction_generated(
+///                 &temporary_channel_id, &counterparty_node_id, funding_transaction
+///             ) {
+///                 Ok(()) => println!("Funding channel {}", temporary_channel_id),
+///                 Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
+///             }
+///         },
+///         Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
+///             assert_eq!(user_channel_id, 42);
+///             println!(
+///                 "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
+///                 former_temporary_channel_id.unwrap()
+///             );
+///         },
+///         Event::ChannelReady { channel_id, user_channel_id, .. } => {
+///             assert_eq!(user_channel_id, 42);
+///             println!("Channel {} ready", channel_id);
+///         },
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1447,28 +1450,31 @@ where
 /// # fn example<T: AChannelManager>(channel_manager: T) {
 /// # let channel_manager = channel_manager.get_cm();
 /// # let error_message = "Channel force-closed";
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, ..  } => {
-///         if !is_trusted(counterparty_node_id) {
-///             match channel_manager.force_close_without_broadcasting_txn(
-///                 &temporary_channel_id, &counterparty_node_id, error_message.to_string()
-///             ) {
-///                 Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
-///                 Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, ..  } => {
+///             if !is_trusted(counterparty_node_id) {
+///                 match channel_manager.force_close_without_broadcasting_txn(
+///                     &temporary_channel_id, &counterparty_node_id, error_message.to_string()
+///                 ) {
+///                     Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
+///                     Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
+///                 }
+///                 return Ok(());
 ///             }
-///             return;
-///         }
 ///
-///         let user_channel_id = 43;
-///         match channel_manager.accept_inbound_channel(
-///             &temporary_channel_id, &counterparty_node_id, user_channel_id
-///         ) {
-///             Ok(()) => println!("Accepting channel {}", temporary_channel_id),
-///             Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
-///         }
-///     },
-///     // ...
-/// #     _ => {},
+///             let user_channel_id = 43;
+///             match channel_manager.accept_inbound_channel(
+///                 &temporary_channel_id, &counterparty_node_id, user_channel_id
+///             ) {
+///                 Ok(()) => println!("Accepting channel {}", temporary_channel_id),
+///                 Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
+///             }
+///         },
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1497,13 +1503,16 @@ where
 /// }
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::ChannelClosed { channel_id, user_channel_id, ..  } => {
-///         assert_eq!(user_channel_id, 42);
-///         println!("Channel {} closed", channel_id);
-///     },
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::ChannelClosed { channel_id, user_channel_id, ..  } => {
+///             assert_eq!(user_channel_id, 42);
+///             println!("Channel {} closed", channel_id);
+///         },
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1553,30 +1562,33 @@ where
 /// };
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
-///         PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
-///             assert_eq!(payment_hash, known_payment_hash);
-///             println!("Claiming payment {}", payment_hash);
-///             channel_manager.claim_funds(payment_preimage);
-///         },
-///         PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
-///             println!("Unknown payment hash: {}", payment_hash);
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///             PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
+///                 assert_eq!(payment_hash, known_payment_hash);
+///                 println!("Claiming payment {}", payment_hash);
+///                 channel_manager.claim_funds(payment_preimage);
+///             },
+///             PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
+///                 println!("Unknown payment hash: {}", payment_hash);
+///             },
+///             PaymentPurpose::SpontaneousPayment(payment_preimage) => {
+///                 assert_ne!(payment_hash, known_payment_hash);
+///                 println!("Claiming spontaneous payment {}", payment_hash);
+///                 channel_manager.claim_funds(payment_preimage);
+///             },
+///             // ...
+/// #           _ => {},
 ///         },
-///         PaymentPurpose::SpontaneousPayment(payment_preimage) => {
-///             assert_ne!(payment_hash, known_payment_hash);
-///             println!("Claiming spontaneous payment {}", payment_hash);
-///             channel_manager.claim_funds(payment_preimage);
+///         Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///             assert_eq!(payment_hash, known_payment_hash);
+///             println!("Claimed {} msats", amount_msat);
 ///         },
 ///         // ...
-/// #         _ => {},
-///     },
-///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
-///         assert_eq!(payment_hash, known_payment_hash);
-///         println!("Claimed {} msats", amount_msat);
-///     },
-///     // ...
-/// #     _ => {},
+/// #       _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1619,11 +1631,14 @@ where
 /// );
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
-///     Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
+///         Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1657,23 +1672,25 @@ where
 /// let bech32_offer = offer.to_string();
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
-///         PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => {
-///             println!("Claiming payment {}", payment_hash);
-///             channel_manager.claim_funds(payment_preimage);
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///             PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => {
+///                 println!("Claiming payment {}", payment_hash);
+///                 channel_manager.claim_funds(payment_preimage);
+///             },
+///             PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => {
+///                 println!("Unknown payment hash: {}", payment_hash);
+///             }
+/// #           _ => {},
 ///         },
-///         PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => {
-///             println!("Unknown payment hash: {}", payment_hash);
+///         Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///             println!("Claimed {} msats", amount_msat);
 ///         },
 ///         // ...
-/// #         _ => {},
-///     },
-///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
-///         println!("Claimed {} msats", amount_msat);
-///     },
-///     // ...
-/// #     _ => {},
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # Ok(())
 /// # }
@@ -1719,12 +1736,15 @@ where
 /// );
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
-///     Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
-///     Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+///         Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///         Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1779,11 +1799,14 @@ where
 /// );
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
-///     Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+///         Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # Ok(())
 /// # }
@@ -1809,18 +1832,19 @@ where
 /// };
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
-///            PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => {
-///             assert_eq!(payment_hash, known_payment_hash);
-///             println!("Claiming payment {}", payment_hash);
-///             channel_manager.claim_funds(payment_preimage);
-///         },
-///            PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => {
-///             println!("Unknown payment hash: {}", payment_hash);
-///            },
-///         // ...
-/// #         _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///             PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => {
+///                 assert_eq!(payment_hash, known_payment_hash);
+///                 println!("Claiming payment {}", payment_hash);
+///                 channel_manager.claim_funds(payment_preimage);
+///             },
+///             PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => {
+///                 println!("Unknown payment hash: {}", payment_hash);
+///             },
+///             // ...
+/// #           _ => {},
 ///     },
 ///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
 ///         assert_eq!(payment_hash, known_payment_hash);
@@ -1828,6 +1852,8 @@ where
 ///     },
 ///     // ...
 /// #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -2831,8 +2857,9 @@ macro_rules! handle_new_monitor_update {
 
 macro_rules! process_events_body {
        ($self: expr, $event_to_handle: expr, $handle_event: expr) => {
+               let mut handling_failed = false;
                let mut processed_all_events = false;
-               while !processed_all_events {
+               while !handling_failed && !processed_all_events {
                        if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
                                return;
                        }
@@ -2856,24 +2883,34 @@ macro_rules! process_events_body {
                        }
 
                        let pending_events = $self.pending_events.lock().unwrap().clone();
-                       let num_events = pending_events.len();
                        if !pending_events.is_empty() {
                                result = NotifyOption::DoPersist;
                        }
 
                        let mut post_event_actions = Vec::new();
 
+                       let mut num_handled_events = 0;
                        for (event, action_opt) in pending_events {
                                $event_to_handle = event;
-                               $handle_event;
-                               if let Some(action) = action_opt {
-                                       post_event_actions.push(action);
+                               match $handle_event {
+                                       Ok(()) => {
+                                               if let Some(action) = action_opt {
+                                                       post_event_actions.push(action);
+                                               }
+                                               num_handled_events += 1;
+                                       }
+                                       Err(_e) => {
+                                               // If we encounter an error we stop handling events and make sure to replay
+                                               // any unhandled events on the next invocation.
+                                               handling_failed = true;
+                                               break;
+                                       }
                                }
                        }
 
                        {
                                let mut pending_events = $self.pending_events.lock().unwrap();
-                               pending_events.drain(..num_events);
+                               pending_events.drain(..num_handled_events);
                                processed_all_events = pending_events.is_empty();
                                // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
                                // updated here with the `pending_events` lock acquired.
@@ -9240,7 +9277,7 @@ where
        #[cfg(any(test, feature = "_test_utils"))]
        pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
                let events = core::cell::RefCell::new(Vec::new());
-               let event_handler = |event: events::Event| events.borrow_mut().push(event);
+               let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
@@ -9347,7 +9384,7 @@ where
        /// using the given event handler.
        ///
        /// See the trait-level documentation of [`EventsProvider`] for requirements.
-       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+       pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
                &self, handler: H
        ) {
                let mut ev;
index 371b4f5879dd521f365771a9b5d55649bb92f404..16e62bf33f4f3d6376cea774add8341dcf71fcb2 100644 (file)
@@ -307,7 +307,7 @@ fn disconnect_peers(node_a: &MessengerNode, node_b: &MessengerNode) {
 
 fn release_events(node: &MessengerNode) -> Vec<Event> {
        let events = core::cell::RefCell::new(Vec::new());
-       node.messenger.process_pending_events(&|e| events.borrow_mut().push(e));
+       node.messenger.process_pending_events(&|e| Ok(events.borrow_mut().push(e)));
        events.into_inner()
 }
 
index 75d92610ae597740fbde450c0c6c0b8be6531ae6..3a94d6c26de05c466ccf12ad109b2767b3396f93 100644 (file)
@@ -18,7 +18,7 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
 use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
 use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, MessageContext, OffersContext, ReceiveTlvs};
 use crate::blinded_path::utils;
-use crate::events::{Event, EventHandler, EventsProvider};
+use crate::events::{Event, EventHandler, EventsProvider, ReplayEvent};
 use crate::sign::{EntropySource, NodeSigner, Recipient};
 use crate::ln::features::{InitFeatures, NodeFeatures};
 use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress};
@@ -31,6 +31,7 @@ use super::packet::OnionMessageContents;
 use super::packet::ParsedOnionMessageContents;
 use super::offers::OffersMessageHandler;
 use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
+use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
 use crate::util::logger::{Logger, WithContext};
 use crate::util::ser::Writeable;
 
@@ -1328,7 +1329,7 @@ where
        /// have an ordering requirement.
        ///
        /// See the trait-level documentation of [`EventsProvider`] for requirements.
-       pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
+       pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
                &self, handler: H
        ) {
                let mut intercepted_msgs = Vec::new();
@@ -1346,26 +1347,29 @@ where
                for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
                        if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
                                if let Some(addresses) = addresses.take() {
-                                       futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
+                                       let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
+                                       futures.push(future);
                                }
                        }
                }
 
                for ev in intercepted_msgs {
                        if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
-                       futures.push(Some(handler(ev)));
+                       let future = ResultFuture::Pending(handler(ev));
+                       futures.push(future);
                }
                // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
-               crate::util::async_poll::MultiFuturePoller(futures).await;
+               MultiResultFuturePoller::new(futures).await;
 
                if peer_connecteds.len() <= 1 {
                        for event in peer_connecteds { handler(event).await; }
                } else {
                        let mut futures = Vec::new();
                        for event in peer_connecteds {
-                               futures.push(Some(handler(event)));
+                               let future = ResultFuture::Pending(handler(event));
+                               futures.push(future);
                        }
-                       crate::util::async_poll::MultiFuturePoller(futures).await;
+                       MultiResultFuturePoller::new(futures).await;
                }
        }
 }
@@ -1409,7 +1413,7 @@ where
                for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
                        if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
                                if let Some(addresses) = addresses.take() {
-                                       handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
+                                       let _ = handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
                                }
                        }
                }
index 7a368af7baebaec3e2dd24a2aff6b5c0c34af76b..c18ada73a475f9f76e555691d8c5438b52bf0cab 100644 (file)
@@ -15,29 +15,62 @@ use core::marker::Unpin;
 use core::pin::Pin;
 use core::task::{Context, Poll};
 
-pub(crate) struct MultiFuturePoller<F: Future<Output = ()> + Unpin>(pub Vec<Option<F>>);
+pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> {
+       Pending(F),
+       Ready(Result<(), E>),
+}
+
+pub(crate) struct MultiResultFuturePoller<
+       F: Future<Output = Result<(), E>> + Unpin,
+       E: Copy + Unpin,
+> {
+       futures_state: Vec<ResultFuture<F, E>>,
+}
 
-impl<F: Future<Output = ()> + Unpin> Future for MultiFuturePoller<F> {
-       type Output = ();
-       fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> MultiResultFuturePoller<F, E> {
+       pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
+               Self { futures_state }
+       }
+}
+
+impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> Future
+       for MultiResultFuturePoller<F, E>
+{
+       type Output = Vec<Result<(), E>>;
+       fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
                let mut have_pending_futures = false;
-               for fut_option in self.get_mut().0.iter_mut() {
-                       let mut fut = match fut_option.take() {
-                               None => continue,
-                               Some(fut) => fut,
-                       };
-                       match Pin::new(&mut fut).poll(cx) {
-                               Poll::Ready(()) => {},
-                               Poll::Pending => {
-                                       have_pending_futures = true;
-                                       *fut_option = Some(fut);
+               let futures_state = &mut self.get_mut().futures_state;
+               for state in futures_state.iter_mut() {
+                       match state {
+                               ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {
+                                       Poll::Ready(res) => {
+                                               *state = ResultFuture::Ready(res);
+                                       },
+                                       Poll::Pending => {
+                                               have_pending_futures = true;
+                                       },
                                },
+                               ResultFuture::Ready(_) => continue,
                        }
                }
+
                if have_pending_futures {
                        Poll::Pending
                } else {
-                       Poll::Ready(())
+                       let results = futures_state
+                               .drain(..)
+                               .filter_map(|e| match e {
+                                       ResultFuture::Ready(res) => Some(res),
+                                       ResultFuture::Pending(_) => {
+                                               debug_assert!(
+                                                       false,
+                                                       "All futures are expected to be ready if none are pending"
+                                               );
+                                               None
+                                       },
+                               })
+                               .collect();
+                       Poll::Ready(results)
                }
        }
 }