X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchannelmonitor.rs;h=0e87f3569e605d0c25736a9d203bc1fe6170655f;hb=refs%2Fheads%2Fupstream%2Fmain;hp=a037a965a8fbaac1f5419ed5acc41bb83617a594;hpb=38285167b1ddcb9c69e7b30b3ef4e2c2be00d986;p=rust-lightning diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index a037a965..5ecea825 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -51,7 +51,7 @@ use crate::chain::Filter; use crate::util::logger::{Logger, Record}; use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48}; use crate::util::byte_utils; -use crate::events::{ClosureReason, Event, EventHandler}; +use crate::events::{ClosureReason, Event, EventHandler, ReplayEvent}; use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent}; #[allow(unused_imports)] @@ -189,7 +189,7 @@ pub enum MonitorEvent { monitor_update_id: u64, }, } -impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, +impl_writeable_tlv_based_enum_upgradable_legacy!(MonitorEvent, // Note that Completed is currently never serialized to disk as it is generated only in // ChainMonitor. (0, Completed) => { @@ -1159,34 +1159,53 @@ impl Writeable for ChannelMonitorImpl { macro_rules! _process_events_body { ($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => { loop { + let mut handling_res = Ok(()); let (pending_events, repeated_events); if let Some(us) = $self_opt { let mut inner = us.inner.lock().unwrap(); if inner.is_processing_pending_events { - break; + break handling_res; } inner.is_processing_pending_events = true; pending_events = inner.pending_events.clone(); repeated_events = inner.get_repeated_events(); - } else { break; } - let num_events = pending_events.len(); + } else { break handling_res; } - for event in pending_events.into_iter().chain(repeated_events.into_iter()) { + let mut num_handled_events = 0; + for event in pending_events { $event_to_handle = event; - $handle_event; + match $handle_event { + Ok(()) => num_handled_events += 1, + Err(e) => { + // If we encounter an error we stop handling events and make sure to replay + // any unhandled events on the next invocation. + handling_res = Err(e); + break; + } + } + } + + if handling_res.is_ok() { + for event in repeated_events { + // For repeated events we ignore any errors as they will be replayed eventually + // anyways. + $event_to_handle = event; + let _ = $handle_event; + } } if let Some(us) = $self_opt { let mut inner = us.inner.lock().unwrap(); - inner.pending_events.drain(..num_events); + inner.pending_events.drain(..num_handled_events); inner.is_processing_pending_events = false; - if !inner.pending_events.is_empty() { - // If there's more events to process, go ahead and do so. + if handling_res.is_ok() && !inner.pending_events.is_empty() { + // If there's more events to process and we didn't fail so far, go ahead and do + // so. continue; } } - break; + break handling_res; } } } @@ -1498,21 +1517,23 @@ impl ChannelMonitor { /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in /// order to handle these events. /// + /// Will return a [`ReplayEvent`] error if event handling failed and should eventually be retried. + /// /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs /// [`BumpTransaction`]: crate::events::Event::BumpTransaction - pub fn process_pending_events(&self, handler: &H) where H::Target: EventHandler { + pub fn process_pending_events(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler { let mut ev; - process_events_body!(Some(self), ev, handler.handle_event(ev)); + process_events_body!(Some(self), ev, handler.handle_event(ev)) } /// Processes any events asynchronously. /// /// See [`Self::process_pending_events`] for more information. - pub async fn process_pending_events_async Future>( + pub async fn process_pending_events_async>, H: Fn(Event) -> Future>( &self, handler: &H - ) { + ) -> Result<(), ReplayEvent> { let mut ev; - process_events_body!(Some(self), ev, { handler(ev).await }); + process_events_body!(Some(self), ev, { handler(ev).await }) } #[cfg(test)] @@ -1812,6 +1833,12 @@ impl ChannelMonitor { ); } + /// Returns true if the monitor has pending claim requests that are not fully confirmed yet. + pub fn has_pending_claims(&self) -> bool + { + self.inner.lock().unwrap().onchain_tx_handler.has_pending_claims() + } + /// Triggers rebroadcasts of pending claims from a force-closed channel after a transaction /// signature generation failure. pub fn signer_unblocked(