Merge pull request #3152 from alecchendev/2024-06-async-commit-secret-raa
[rust-lightning] / lightning / src / chain / channelmonitor.rs
index 6992eb192abf98b35c7e569646893e27a0abe158..5ecea8251004f5f2f1413f0f65033034d5c3fd6d 100644 (file)
@@ -51,7 +51,7 @@ use crate::chain::Filter;
 use crate::util::logger::{Logger, Record};
 use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
 use crate::util::byte_utils;
 use crate::util::logger::{Logger, Record};
 use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
 use crate::util::byte_utils;
-use crate::events::{ClosureReason, Event, EventHandler};
+use crate::events::{ClosureReason, Event, EventHandler, ReplayEvent};
 use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent};
 
 #[allow(unused_imports)]
 use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent};
 
 #[allow(unused_imports)]
@@ -189,7 +189,7 @@ pub enum MonitorEvent {
                monitor_update_id: u64,
        },
 }
                monitor_update_id: u64,
        },
 }
-impl_writeable_tlv_based_enum_upgradable!(MonitorEvent,
+impl_writeable_tlv_based_enum_upgradable_legacy!(MonitorEvent,
        // Note that Completed is currently never serialized to disk as it is generated only in
        // ChainMonitor.
        (0, Completed) => {
        // Note that Completed is currently never serialized to disk as it is generated only in
        // ChainMonitor.
        (0, Completed) => {
@@ -1159,34 +1159,53 @@ impl<Signer: EcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signer> {
 macro_rules! _process_events_body {
        ($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
                loop {
 macro_rules! _process_events_body {
        ($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
                loop {
+                       let mut handling_res = Ok(());
                        let (pending_events, repeated_events);
                        if let Some(us) = $self_opt {
                                let mut inner = us.inner.lock().unwrap();
                                if inner.is_processing_pending_events {
                        let (pending_events, repeated_events);
                        if let Some(us) = $self_opt {
                                let mut inner = us.inner.lock().unwrap();
                                if inner.is_processing_pending_events {
-                                       break;
+                                       break handling_res;
                                }
                                inner.is_processing_pending_events = true;
 
                                pending_events = inner.pending_events.clone();
                                repeated_events = inner.get_repeated_events();
                                }
                                inner.is_processing_pending_events = true;
 
                                pending_events = inner.pending_events.clone();
                                repeated_events = inner.get_repeated_events();
-                       } else { break; }
-                       let num_events = pending_events.len();
+                       } else { break handling_res; }
 
 
-                       for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
+                       let mut num_handled_events = 0;
+                       for event in pending_events {
                                $event_to_handle = event;
                                $event_to_handle = event;
-                               $handle_event;
+                               match $handle_event {
+                                       Ok(()) => num_handled_events += 1,
+                                       Err(e) => {
+                                               // If we encounter an error we stop handling events and make sure to replay
+                                               // any unhandled events on the next invocation.
+                                               handling_res = Err(e);
+                                               break;
+                                       }
+                               }
+                       }
+
+                       if handling_res.is_ok() {
+                               for event in repeated_events {
+                                       // For repeated events we ignore any errors as they will be replayed eventually
+                                       // anyways.
+                                       $event_to_handle = event;
+                                       let _ = $handle_event;
+                               }
                        }
 
                        if let Some(us) = $self_opt {
                                let mut inner = us.inner.lock().unwrap();
                        }
 
                        if let Some(us) = $self_opt {
                                let mut inner = us.inner.lock().unwrap();
-                               inner.pending_events.drain(..num_events);
+                               inner.pending_events.drain(..num_handled_events);
                                inner.is_processing_pending_events = false;
                                inner.is_processing_pending_events = false;
-                               if !inner.pending_events.is_empty() {
-                                       // If there's more events to process, go ahead and do so.
+                               if handling_res.is_ok() && !inner.pending_events.is_empty() {
+                                       // If there's more events to process and we didn't fail so far, go ahead and do
+                                       // so.
                                        continue;
                                }
                        }
                                        continue;
                                }
                        }
-                       break;
+                       break handling_res;
                }
        }
 }
                }
        }
 }
@@ -1498,21 +1517,23 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
        /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
        /// order to handle these events.
        ///
        /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
        /// order to handle these events.
        ///
+       /// Will return a [`ReplayEvent`] error if event handling failed and should eventually be retried.
+       ///
        /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
        /// [`BumpTransaction`]: crate::events::Event::BumpTransaction
        /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
        /// [`BumpTransaction`]: crate::events::Event::BumpTransaction
-       pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
+       pub fn process_pending_events<H: Deref>(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler {
                let mut ev;
                let mut ev;
-               process_events_body!(Some(self), ev, handler.handle_event(ev));
+               process_events_body!(Some(self), ev, handler.handle_event(ev))
        }
 
        /// Processes any events asynchronously.
        ///
        /// See [`Self::process_pending_events`] for more information.
        }
 
        /// Processes any events asynchronously.
        ///
        /// See [`Self::process_pending_events`] for more information.
-       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+       pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
                &self, handler: &H
                &self, handler: &H
-       ) {
+       ) -> Result<(), ReplayEvent> {
                let mut ev;
                let mut ev;
-               process_events_body!(Some(self), ev, { handler(ev).await });
+               process_events_body!(Some(self), ev, { handler(ev).await })
        }
 
        #[cfg(test)]
        }
 
        #[cfg(test)]
@@ -1812,6 +1833,12 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
                );
        }
 
                );
        }
 
+       /// Returns true if the monitor has pending claim requests that are not fully confirmed yet.
+       pub fn has_pending_claims(&self) -> bool
+       {
+               self.inner.lock().unwrap().onchain_tx_handler.has_pending_claims()
+       }
+
        /// Triggers rebroadcasts of pending claims from a force-closed channel after a transaction
        /// signature generation failure.
        pub fn signer_unblocked<B: Deref, F: Deref, L: Deref>(
        /// Triggers rebroadcasts of pending claims from a force-closed channel after a transaction
        /// signature generation failure.
        pub fn signer_unblocked<B: Deref, F: Deref, L: Deref>(
@@ -1924,9 +1951,9 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
        }
 
        #[cfg(test)]
        }
 
        #[cfg(test)]
-       pub fn do_signer_call<F: FnMut(&Signer) -> ()>(&self, mut f: F) {
-               let inner = self.inner.lock().unwrap();
-               f(&inner.onchain_tx_handler.signer);
+       pub fn do_mut_signer_call<F: FnMut(&mut Signer) -> ()>(&self, mut f: F) {
+               let mut inner = self.inner.lock().unwrap();
+               f(&mut inner.onchain_tx_handler.signer);
        }
 }
 
        }
 }
 
@@ -2873,7 +2900,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
                F::Target: FeeEstimator,
                L::Target: Logger,
        {
                F::Target: FeeEstimator,
                L::Target: Logger,
        {
-               let (claimable_outpoints, _) = self.generate_claimable_outpoints_and_watch_outputs(ClosureReason::HolderForceClosed);
+               let (claimable_outpoints, _) = self.generate_claimable_outpoints_and_watch_outputs(ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) });
                self.onchain_tx_handler.update_claims_view_from_requests(
                        claimable_outpoints, self.best_block.height, self.best_block.height, broadcaster,
                        fee_estimator, logger
                self.onchain_tx_handler.update_claims_view_from_requests(
                        claimable_outpoints, self.best_block.height, self.best_block.height, broadcaster,
                        fee_estimator, logger
@@ -3105,9 +3132,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
                                                        },
                                                        commitment_txid: htlc.commitment_txid,
                                                        per_commitment_number: htlc.per_commitment_number,
                                                        },
                                                        commitment_txid: htlc.commitment_txid,
                                                        per_commitment_number: htlc.per_commitment_number,
-                                                       per_commitment_point: self.onchain_tx_handler.signer.get_per_commitment_point(
-                                                               htlc.per_commitment_number, &self.onchain_tx_handler.secp_ctx,
-                                                       ),
+                                                       per_commitment_point: htlc.per_commitment_point,
                                                        feerate_per_kw: 0,
                                                        htlc: htlc.htlc,
                                                        preimage: htlc.preimage,
                                                        feerate_per_kw: 0,
                                                        htlc: htlc.htlc,
                                                        preimage: htlc.preimage,