Track an `EventCompletionAction` for after an `Event` is processed
authorMatt Corallo <git@bluematt.me>
Fri, 28 Apr 2023 04:24:25 +0000 (04:24 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 2 May 2023 17:59:22 +0000 (17:59 +0000)
This will allow us to block `ChannelMonitorUpdate`s on `Event`
processing in the next commit.

Note that this gets dangerously close to breaking forwards
compatibility - if we have an `Event` with an
`EventCompletionAction` tied to it, we persist a new, even, TLV in
the `ChannelManager`. Hopefully this should be uncommon, as it
implies an `Event` was delayed until after a full round-trip to a
peer.

lightning/src/ln/channelmanager.rs
lightning/src/ln/outbound_payment.rs

index 42211408984c2aeb20cc20dec2a48dd58686db3e..dc824be7b7d0532885cce8c36f38cb2fbd57a910 100644 (file)
@@ -521,6 +521,20 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
        (2, EmitEvent) => { (0, event, upgradable_required) },
 );
 
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub(crate) enum EventCompletionAction {
+       ReleaseRAAChannelMonitorUpdate {
+               counterparty_node_id: PublicKey,
+               channel_funding_outpoint: OutPoint,
+       },
+}
+impl_writeable_tlv_based_enum!(EventCompletionAction,
+       (0, ReleaseRAAChannelMonitorUpdate) => {
+               (0, channel_funding_outpoint, required),
+               (2, counterparty_node_id, required),
+       };
+);
+
 /// State we hold per-peer.
 pub(super) struct PeerState<Signer: ChannelSigner> {
        /// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -932,8 +946,17 @@ where
        #[cfg(any(test, feature = "_test_utils"))]
        pub(super) per_peer_state: FairRwLock<HashMap<PublicKey, Mutex<PeerState<<SP::Target as SignerProvider>::Signer>>>>,
 
+       /// The set of events which we need to give to the user to handle. In some cases an event may
+       /// require some further action after the user handles it (currently only blocking a monitor
+       /// update from being handed to the user to ensure the included changes to the channel state
+       /// are handled by the user before they're persisted durably to disk). In that case, the second
+       /// element in the tuple is set to `Some` with further details of the action.
+       ///
+       /// Note that events MUST NOT be removed from pending_events after deserialization, as they
+       /// could be in the middle of being processed without the direct mutex held.
+       ///
        /// See `ChannelManager` struct-level documentation for lock order requirements.
-       pending_events: Mutex<Vec<events::Event>>,
+       pending_events: Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>,
        /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
        pending_events_processor: AtomicBool,
        /// See `ChannelManager` struct-level documentation for lock order requirements.
@@ -1446,10 +1469,10 @@ macro_rules! handle_error {
                                                });
                                        }
                                        if let Some((channel_id, user_channel_id)) = chan_id {
-                                               $self.pending_events.lock().unwrap().push(events::Event::ChannelClosed {
+                                               $self.pending_events.lock().unwrap().push_back((events::Event::ChannelClosed {
                                                        channel_id, user_channel_id,
                                                        reason: ClosureReason::ProcessingError { err: err.err.clone() }
-                                               });
+                                               }, None));
                                        }
                                }
 
@@ -1581,13 +1604,13 @@ macro_rules! send_channel_ready {
 macro_rules! emit_channel_pending_event {
        ($locked_events: expr, $channel: expr) => {
                if $channel.should_emit_channel_pending_event() {
-                       $locked_events.push(events::Event::ChannelPending {
+                       $locked_events.push_back((events::Event::ChannelPending {
                                channel_id: $channel.channel_id(),
                                former_temporary_channel_id: $channel.temporary_channel_id(),
                                counterparty_node_id: $channel.get_counterparty_node_id(),
                                user_channel_id: $channel.get_user_id(),
                                funding_txo: $channel.get_funding_txo().unwrap().into_bitcoin_outpoint(),
-                       });
+                       }, None));
                        $channel.set_channel_pending_event_emitted();
                }
        }
@@ -1597,12 +1620,12 @@ macro_rules! emit_channel_ready_event {
        ($locked_events: expr, $channel: expr) => {
                if $channel.should_emit_channel_ready_event() {
                        debug_assert!($channel.channel_pending_event_emitted());
-                       $locked_events.push(events::Event::ChannelReady {
+                       $locked_events.push_back((events::Event::ChannelReady {
                                channel_id: $channel.channel_id(),
                                user_channel_id: $channel.get_user_id(),
                                counterparty_node_id: $channel.get_counterparty_node_id(),
                                channel_type: $channel.get_channel_type().clone(),
-                       });
+                       }, None));
                        $channel.set_channel_ready_event_emitted();
                }
        }
@@ -1721,7 +1744,7 @@ macro_rules! process_events_body {
                                result = NotifyOption::DoPersist;
                        }
 
-                       for event in pending_events {
+                       for (event, _action) in pending_events {
                                $event_to_handle = event;
                                $handle_event;
                        }
@@ -1802,7 +1825,7 @@ where
 
                        per_peer_state: FairRwLock::new(HashMap::new()),
 
-                       pending_events: Mutex::new(Vec::new()),
+                       pending_events: Mutex::new(VecDeque::new()),
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
@@ -2010,15 +2033,17 @@ where
                let mut pending_events_lock = self.pending_events.lock().unwrap();
                match channel.unbroadcasted_funding() {
                        Some(transaction) => {
-                               pending_events_lock.push(events::Event::DiscardFunding { channel_id: channel.channel_id(), transaction })
+                               pending_events_lock.push_back((events::Event::DiscardFunding {
+                                       channel_id: channel.channel_id(), transaction
+                               }, None));
                        },
                        None => {},
                }
-               pending_events_lock.push(events::Event::ChannelClosed {
+               pending_events_lock.push_back((events::Event::ChannelClosed {
                        channel_id: channel.channel_id(),
                        user_channel_id: channel.get_user_id(),
                        reason: closure_reason
-               });
+               }, None));
        }
 
        fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option<u32>) -> Result<(), APIError> {
@@ -3233,7 +3258,7 @@ where
        pub fn process_pending_htlc_forwards(&self) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
-               let mut new_events = Vec::new();
+               let mut new_events = VecDeque::new();
                let mut failed_forwards = Vec::new();
                let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
                {
@@ -3559,7 +3584,7 @@ where
                                                                                        htlcs.push(claimable_htlc);
                                                                                        let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum();
                                                                                        htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat));
-                                                                                       new_events.push(events::Event::PaymentClaimable {
+                                                                                       new_events.push_back((events::Event::PaymentClaimable {
                                                                                                receiver_node_id: Some(receiver_node_id),
                                                                                                payment_hash,
                                                                                                purpose: purpose(),
@@ -3568,7 +3593,7 @@ where
                                                                                                via_user_channel_id: Some(prev_user_channel_id),
                                                                                                claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER),
                                                                                                onion_fields: claimable_payment.onion_fields.clone(),
-                                                                                       });
+                                                                                       }, None));
                                                                                        payment_claimable_generated = true;
                                                                                } else {
                                                                                        // Nothing to do - we haven't reached the total
@@ -3629,7 +3654,7 @@ where
                                                                                                                        htlcs: vec![claimable_htlc],
                                                                                                                });
                                                                                                                let prev_channel_id = prev_funding_outpoint.to_channel_id();
-                                                                                                               new_events.push(events::Event::PaymentClaimable {
+                                                                                                               new_events.push_back((events::Event::PaymentClaimable {
                                                                                                                        receiver_node_id: Some(receiver_node_id),
                                                                                                                        payment_hash,
                                                                                                                        amount_msat,
@@ -3638,7 +3663,7 @@ where
                                                                                                                        via_user_channel_id: Some(prev_user_channel_id),
                                                                                                                        claim_deadline,
                                                                                                                        onion_fields: Some(onion_fields),
-                                                                                                               });
+                                                                                                               }, None));
                                                                                                        },
                                                                                                        hash_map::Entry::Occupied(_) => {
                                                                                                                log_trace!(self.logger, "Failing new keysend HTLC with payment_hash {} for a duplicative payment hash", log_bytes!(payment_hash.0));
@@ -4116,10 +4141,10 @@ where
                                mem::drop(forward_htlcs);
                                if push_forward_ev { self.push_pending_forwards_ev(); }
                                let mut pending_events = self.pending_events.lock().unwrap();
-                               pending_events.push(events::Event::HTLCHandlingFailed {
+                               pending_events.push_back((events::Event::HTLCHandlingFailed {
                                        prev_channel_id: outpoint.to_channel_id(),
                                        failed_next_destination: destination,
-                               });
+                               }, None));
                        },
                }
        }
@@ -4382,13 +4407,13 @@ where
                                MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
                                        let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                                        if let Some(ClaimingPayment { amount_msat, payment_purpose: purpose, receiver_node_id }) = payment {
-                                               self.pending_events.lock().unwrap().push(events::Event::PaymentClaimed {
+                                               self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed {
                                                        payment_hash, purpose, amount_msat, receiver_node_id: Some(receiver_node_id),
-                                               });
+                                               }, None));
                                        }
                                },
                                MonitorUpdateCompletionAction::EmitEvent { event } => {
-                                       self.pending_events.lock().unwrap().push(event);
+                                       self.pending_events.lock().unwrap().push_back((event, None));
                                },
                        }
                }
@@ -4712,15 +4737,13 @@ where
                                        });
                                } else {
                                        let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(
-                                               events::Event::OpenChannelRequest {
-                                                       temporary_channel_id: msg.temporary_channel_id.clone(),
-                                                       counterparty_node_id: counterparty_node_id.clone(),
-                                                       funding_satoshis: msg.funding_satoshis,
-                                                       push_msat: msg.push_msat,
-                                                       channel_type: channel.get_channel_type().clone(),
-                                               }
-                                       );
+                                       pending_events.push_back((events::Event::OpenChannelRequest {
+                                               temporary_channel_id: msg.temporary_channel_id.clone(),
+                                               counterparty_node_id: counterparty_node_id.clone(),
+                                               funding_satoshis: msg.funding_satoshis,
+                                               push_msat: msg.push_msat,
+                                               channel_type: channel.get_channel_type().clone(),
+                                       }, None));
                                }
 
                                entry.insert(channel);
@@ -4748,13 +4771,13 @@ where
                        }
                };
                let mut pending_events = self.pending_events.lock().unwrap();
-               pending_events.push(events::Event::FundingGenerationReady {
+               pending_events.push_back((events::Event::FundingGenerationReady {
                        temporary_channel_id: msg.temporary_channel_id,
                        counterparty_node_id: *counterparty_node_id,
                        channel_value_satoshis: value,
                        output_script,
                        user_channel_id: user_id,
-               });
+               }, None));
                Ok(())
        }
 
@@ -5144,7 +5167,7 @@ where
        fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) {
                for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
                        let mut push_forward_event = false;
-                       let mut new_intercept_events = Vec::new();
+                       let mut new_intercept_events = VecDeque::new();
                        let mut failed_intercept_forwards = Vec::new();
                        if !pending_forwards.is_empty() {
                                for (forward_info, prev_htlc_id) in pending_forwards.drain(..) {
@@ -5171,13 +5194,13 @@ where
                                                                let mut pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap();
                                                                match pending_intercepts.entry(intercept_id) {
                                                                        hash_map::Entry::Vacant(entry) => {
-                                                                               new_intercept_events.push(events::Event::HTLCIntercepted {
+                                                                               new_intercept_events.push_back((events::Event::HTLCIntercepted {
                                                                                        requested_next_hop_scid: scid,
                                                                                        payment_hash: forward_info.payment_hash,
                                                                                        inbound_amount_msat: forward_info.incoming_amt_msat.unwrap(),
                                                                                        expected_outbound_amount_msat: forward_info.outgoing_amt_msat,
                                                                                        intercept_id
-                                                                               });
+                                                                               }, None));
                                                                                entry.insert(PendingAddHTLCInfo {
                                                                                        prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info });
                                                                        },
@@ -5227,13 +5250,13 @@ where
        fn push_pending_forwards_ev(&self) {
                let mut pending_events = self.pending_events.lock().unwrap();
                let forward_ev_exists = pending_events.iter()
-                       .find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
+                       .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
                        .is_some();
                if !forward_ev_exists {
-                       pending_events.push(events::Event::PendingHTLCsForwardable {
+                       pending_events.push_back((events::Event::PendingHTLCsForwardable {
                                time_forwardable:
                                        Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
-                       });
+                       }, None));
                }
        }
 
@@ -5884,13 +5907,13 @@ where
        #[cfg(feature = "_test_utils")]
        pub fn push_pending_event(&self, event: events::Event) {
                let mut events = self.pending_events.lock().unwrap();
-               events.push(event);
+               events.push_back((event, None));
        }
 
        #[cfg(test)]
        pub fn pop_pending_event(&self) -> Option<events::Event> {
                let mut events = self.pending_events.lock().unwrap();
-               if events.is_empty() { None } else { Some(events.remove(0)) }
+               events.pop_front().map(|(e, _)| e)
        }
 
        #[cfg(test)]
@@ -7227,9 +7250,19 @@ where
                }
 
                let events = self.pending_events.lock().unwrap();
-               (events.len() as u64).write(writer)?;
-               for event in events.iter() {
-                       event.write(writer)?;
+               // LDK versions prior to 0.0.115 don't support post-event actions, thus if there's no
+               // actions at all, skip writing the required TLV. Otherwise, pre-0.0.115 versions will
+               // refuse to read the new ChannelManager.
+               let events_not_backwards_compatible = events.iter().any(|(_, action)| action.is_some());
+               if events_not_backwards_compatible {
+                       // If we're gonna write a even TLV that will overwrite our events anyway we might as
+                       // well save the space and not write any events here.
+                       0u64.write(writer)?;
+               } else {
+                       (events.len() as u64).write(writer)?;
+                       for (event, _) in events.iter() {
+                               event.write(writer)?;
+                       }
                }
 
                let background_events = self.pending_background_events.lock().unwrap();
@@ -7310,6 +7343,7 @@ where
                        (5, self.our_network_pubkey, required),
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, self.fake_scid_rand_bytes, required),
+                       (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
                        (9, htlc_purposes, vec_type),
                        (11, self.probing_cookie_secret, required),
                        (13, htlc_onion_fields, optional_vec),
@@ -7319,6 +7353,47 @@ where
        }
 }
 
+impl Writeable for VecDeque<(Event, Option<EventCompletionAction>)> {
+       fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
+               (self.len() as u64).write(w)?;
+               for (event, action) in self.iter() {
+                       event.write(w)?;
+                       action.write(w)?;
+                       #[cfg(debug_assertions)] {
+                               // Events are MaybeReadable, in some cases indicating that they shouldn't actually
+                               // be persisted and are regenerated on restart. However, if such an event has a
+                               // post-event-handling action we'll write nothing for the event and would have to
+                               // either forget the action or fail on deserialization (which we do below). Thus,
+                               // check that the event is sane here.
+                               let event_encoded = event.encode();
+                               let event_read: Option<Event> =
+                                       MaybeReadable::read(&mut &event_encoded[..]).unwrap();
+                               if action.is_some() { assert!(event_read.is_some()); }
+                       }
+               }
+               Ok(())
+       }
+}
+impl Readable for VecDeque<(Event, Option<EventCompletionAction>)> {
+       fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
+               let len: u64 = Readable::read(reader)?;
+               const MAX_ALLOC_SIZE: u64 = 1024 * 16;
+               let mut events: Self = VecDeque::with_capacity(cmp::min(
+                       MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option<EventCompletionAction>)>() as u64,
+                       len) as usize);
+               for _ in 0..len {
+                       let ev_opt = MaybeReadable::read(reader)?;
+                       let action = Readable::read(reader)?;
+                       if let Some(ev) = ev_opt {
+                               events.push_back((ev, action));
+                       } else if action.is_some() {
+                               return Err(DecodeError::InvalidValue);
+                       }
+               }
+               Ok(events)
+       }
+}
+
 /// Arguments for the creation of a ChannelManager that are not deserialized.
 ///
 /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
@@ -7485,7 +7560,7 @@ where
                let mut peer_channels: HashMap<PublicKey, HashMap<[u8; 32], Channel<<SP::Target as SignerProvider>::Signer>>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
-               let mut channel_closures = Vec::new();
+               let mut channel_closures = VecDeque::new();
                let mut pending_background_events = Vec::new();
                for _ in 0..channel_count {
                        let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (
@@ -7521,11 +7596,11 @@ where
                                                pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate(monitor_update));
                                        }
                                        failed_htlcs.append(&mut new_failed_htlcs);
-                                       channel_closures.push(events::Event::ChannelClosed {
+                                       channel_closures.push_back((events::Event::ChannelClosed {
                                                channel_id: channel.channel_id(),
                                                user_channel_id: channel.get_user_id(),
                                                reason: ClosureReason::OutdatedChannelManager
-                                       });
+                                       }, None));
                                        for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() {
                                                let mut found_htlc = false;
                                                for (monitor_htlc_source, _) in monitor.get_all_current_outbound_htlcs() {
@@ -7570,11 +7645,11 @@ where
                                // was in-progress, we never broadcasted the funding transaction and can still
                                // safely discard the channel.
                                let _ = channel.force_shutdown(false);
-                               channel_closures.push(events::Event::ChannelClosed {
+                               channel_closures.push_back((events::Event::ChannelClosed {
                                        channel_id: channel.channel_id(),
                                        user_channel_id: channel.get_user_id(),
                                        reason: ClosureReason::DisconnectedPeer,
-                               });
+                               }, None));
                        } else {
                                log_error!(args.logger, "Missing ChannelMonitor for channel {} needed by ChannelManager.", log_bytes!(channel.channel_id()));
                                log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
@@ -7635,10 +7710,11 @@ where
                }
 
                let event_count: u64 = Readable::read(reader)?;
-               let mut pending_events_read: Vec<events::Event> = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<events::Event>()));
+               let mut pending_events_read: VecDeque<(events::Event, Option<EventCompletionAction>)> =
+                       VecDeque::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option<EventCompletionAction>)>()));
                for _ in 0..event_count {
                        match MaybeReadable::read(reader)? {
-                               Some(event) => pending_events_read.push(event),
+                               Some(event) => pending_events_read.push_back((event, None)),
                                None => continue,
                        }
                }
@@ -7694,6 +7770,7 @@ where
                let mut claimable_htlc_onion_fields = None;
                let mut pending_claiming_payments = Some(HashMap::new());
                let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
+               let mut events_override = None;
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
                        (2, pending_intercepted_htlcs, option),
@@ -7702,6 +7779,7 @@ where
                        (5, received_network_pubkey, option),
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, fake_scid_rand_bytes, option),
+                       (8, events_override, option),
                        (9, claimable_htlc_purposes, vec_type),
                        (11, probing_cookie_secret, option),
                        (13, claimable_htlc_onion_fields, optional_vec),
@@ -7714,6 +7792,10 @@ where
                        probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes());
                }
 
+               if let Some(events) = events_override {
+                       pending_events_read = events;
+               }
+
                if !channel_closures.is_empty() {
                        pending_events_read.append(&mut channel_closures);
                }
@@ -7809,7 +7891,7 @@ where
                                                                        if pending_forward_matches_htlc(&htlc_info) {
                                                                                log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}",
                                                                                        log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id()));
-                                                                               pending_events_read.retain(|event| {
+                                                                               pending_events_read.retain(|(event, _)| {
                                                                                        if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event {
                                                                                                intercepted_id != ev_id
                                                                                        } else { true }
@@ -7845,9 +7927,9 @@ where
                        // shut down before the timer hit. Either way, set the time_forwardable to a small
                        // constant as enough time has likely passed that we should simply handle the forwards
                        // now, or at least after the user gets a chance to reconnect to our peers.
-                       pending_events_read.push(events::Event::PendingHTLCsForwardable {
+                       pending_events_read.push_back((events::Event::PendingHTLCsForwardable {
                                time_forwardable: Duration::from_secs(2),
-                       });
+                       }, None));
                }
 
                let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material();
@@ -8001,12 +8083,12 @@ where
                                                        previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &args.tx_broadcaster, &bounded_fee_estimator, &args.logger);
                                                }
                                        }
-                                       pending_events_read.push(events::Event::PaymentClaimed {
+                                       pending_events_read.push_back((events::Event::PaymentClaimed {
                                                receiver_node_id,
                                                payment_hash,
                                                purpose: payment.purpose,
                                                amount_msat: claimable_amt_msat,
-                                       });
+                                       }, None));
                                }
                        }
                }
index 5270ed35d8835d6f240e8ae0fcd90b12e0851685..f363a6d60ffa7b877057c5c19b568aecd6a419d7 100644 (file)
@@ -16,7 +16,7 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey};
 use crate::chain::keysinterface::{EntropySource, NodeSigner, Recipient};
 use crate::events::{self, PaymentFailureReason};
 use crate::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
-use crate::ln::channelmanager::{ChannelDetails, HTLCSource, IDEMPOTENCY_TIMEOUT_TICKS, PaymentId};
+use crate::ln::channelmanager::{ChannelDetails, EventCompletionAction, HTLCSource, IDEMPOTENCY_TIMEOUT_TICKS, PaymentId};
 use crate::ln::onion_utils::HTLCFailReason;
 use crate::routing::router::{InFlightHtlcs, Path, PaymentParameters, Route, RouteParameters, Router};
 use crate::util::errors::APIError;
@@ -487,7 +487,7 @@ impl OutboundPayments {
                retry_strategy: Retry, route_params: RouteParameters, router: &R,
                first_hops: Vec<ChannelDetails>, compute_inflight_htlcs: IH, entropy_source: &ES,
                node_signer: &NS, best_block_height: u32, logger: &L,
-               pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: SP,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: SP,
        ) -> Result<(), RetryableSendFailure>
        where
                R::Target: Router,
@@ -525,7 +525,7 @@ impl OutboundPayments {
                payment_id: PaymentId, retry_strategy: Retry, route_params: RouteParameters, router: &R,
                first_hops: Vec<ChannelDetails>, inflight_htlcs: IH, entropy_source: &ES,
                node_signer: &NS, best_block_height: u32, logger: &L,
-               pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: SP
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: SP
        ) -> Result<PaymentHash, RetryableSendFailure>
        where
                R::Target: Router,
@@ -575,7 +575,8 @@ impl OutboundPayments {
 
        pub(super) fn check_retry_payments<R: Deref, ES: Deref, NS: Deref, SP, IH, FH, L: Deref>(
                &self, router: &R, first_hops: FH, inflight_htlcs: IH, entropy_source: &ES, node_signer: &NS,
-               best_block_height: u32, pending_events: &Mutex<Vec<events::Event>>, logger: &L,
+               best_block_height: u32,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, logger: &L,
                send_payment_along_path: SP,
        )
        where
@@ -617,11 +618,11 @@ impl OutboundPayments {
                        if !pmt.is_auto_retryable_now() && pmt.remaining_parts() == 0 {
                                pmt.mark_abandoned(PaymentFailureReason::RetriesExhausted);
                                if let PendingOutboundPayment::Abandoned { payment_hash, reason, .. } = pmt {
-                                       pending_events.lock().unwrap().push(events::Event::PaymentFailed {
+                                       pending_events.lock().unwrap().push_back((events::Event::PaymentFailed {
                                                payment_id: *pmt_id,
                                                payment_hash: *payment_hash,
                                                reason: *reason,
-                                       });
+                                       }, None));
                                        retain = false;
                                }
                        }
@@ -645,7 +646,7 @@ impl OutboundPayments {
                keysend_preimage: Option<PaymentPreimage>, retry_strategy: Retry, route_params: RouteParameters,
                router: &R, first_hops: Vec<ChannelDetails>, inflight_htlcs: IH, entropy_source: &ES,
                node_signer: &NS, best_block_height: u32, logger: &L,
-               pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: SP,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: SP,
        ) -> Result<(), RetryableSendFailure>
        where
                R::Target: Router,
@@ -686,7 +687,7 @@ impl OutboundPayments {
                &self, payment_hash: PaymentHash, payment_id: PaymentId, route_params: RouteParameters,
                router: &R, first_hops: Vec<ChannelDetails>, inflight_htlcs: &IH, entropy_source: &ES,
                node_signer: &NS, best_block_height: u32, logger: &L,
-               pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: &SP,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: &SP,
        )
        where
                R::Target: Router,
@@ -736,11 +737,11 @@ impl OutboundPayments {
                                $payment.get_mut().mark_abandoned($reason);
                                if let PendingOutboundPayment::Abandoned { reason, .. } = $payment.get() {
                                        if $payment.get().remaining_parts() == 0 {
-                                               pending_events.lock().unwrap().push(events::Event::PaymentFailed {
+                                               pending_events.lock().unwrap().push_back((events::Event::PaymentFailed {
                                                        payment_id,
                                                        payment_hash,
                                                        reason: *reason,
-                                               });
+                                               }, None));
                                                $payment.remove();
                                        }
                                }
@@ -808,7 +809,7 @@ impl OutboundPayments {
                &self, err: PaymentSendFailure, payment_id: PaymentId, payment_hash: PaymentHash, route: Route,
                mut route_params: RouteParameters, router: &R, first_hops: Vec<ChannelDetails>,
                inflight_htlcs: &IH, entropy_source: &ES, node_signer: &NS, best_block_height: u32, logger: &L,
-               pending_events: &Mutex<Vec<events::Event>>, send_payment_along_path: &SP,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, send_payment_along_path: &SP,
        )
        where
                R::Target: Router,
@@ -851,7 +852,8 @@ impl OutboundPayments {
 
        fn push_path_failed_evs_and_scids<I: ExactSizeIterator + Iterator<Item = Result<(), APIError>>, L: Deref>(
                payment_id: PaymentId, payment_hash: PaymentHash, route_params: &mut RouteParameters,
-               paths: Vec<Path>, path_results: I, logger: &L, pending_events: &Mutex<Vec<events::Event>>
+               paths: Vec<Path>, path_results: I, logger: &L,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>,
        ) where L::Target: Logger {
                let mut events = pending_events.lock().unwrap();
                debug_assert_eq!(paths.len(), path_results.len());
@@ -865,7 +867,7 @@ impl OutboundPayments {
                                        failed_scid = Some(scid);
                                        route_params.payment_params.previously_failed_channels.push(scid);
                                }
-                               events.push(events::Event::PaymentPathFailed {
+                               events.push_back((events::Event::PaymentPathFailed {
                                        payment_id: Some(payment_id),
                                        payment_hash,
                                        payment_failed_permanently: false,
@@ -876,7 +878,7 @@ impl OutboundPayments {
                                        error_code: None,
                                        #[cfg(test)]
                                        error_data: None,
-                               });
+                               }, None));
                        }
                }
        }
@@ -1112,7 +1114,9 @@ impl OutboundPayments {
 
        pub(super) fn claim_htlc<L: Deref>(
                &self, payment_id: PaymentId, payment_preimage: PaymentPreimage, session_priv: SecretKey,
-               path: Path, from_onchain: bool, pending_events: &Mutex<Vec<events::Event>>, logger: &L
+               path: Path, from_onchain: bool,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>,
+               logger: &L,
        ) where L::Target: Logger {
                let mut session_priv_bytes = [0; 32];
                session_priv_bytes.copy_from_slice(&session_priv[..]);
@@ -1122,14 +1126,12 @@ impl OutboundPayments {
                        if !payment.get().is_fulfilled() {
                                let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
                                let fee_paid_msat = payment.get().get_pending_fee_msat();
-                               pending_events.push(
-                                       events::Event::PaymentSent {
-                                               payment_id: Some(payment_id),
-                                               payment_preimage,
-                                               payment_hash,
-                                               fee_paid_msat,
-                                       }
-                               );
+                               pending_events.push_back((events::Event::PaymentSent {
+                                       payment_id: Some(payment_id),
+                                       payment_preimage,
+                                       payment_hash,
+                                       fee_paid_msat,
+                               }, None));
                                payment.get_mut().mark_fulfilled();
                        }
 
@@ -1142,13 +1144,11 @@ impl OutboundPayments {
                                // irrevocably fulfilled.
                                if payment.get_mut().remove(&session_priv_bytes, Some(&path)) {
                                        let payment_hash = Some(PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()));
-                                       pending_events.push(
-                                               events::Event::PaymentPathSuccessful {
-                                                       payment_id,
-                                                       payment_hash,
-                                                       path,
-                                               }
-                                       );
+                                       pending_events.push_back((events::Event::PaymentPathSuccessful {
+                                               payment_id,
+                                               payment_hash,
+                                               path,
+                                       }, None));
                                }
                        }
                } else {
@@ -1156,7 +1156,9 @@ impl OutboundPayments {
                }
        }
 
-       pub(super) fn finalize_claims(&self, sources: Vec<HTLCSource>, pending_events: &Mutex<Vec<events::Event>>) {
+       pub(super) fn finalize_claims(&self, sources: Vec<HTLCSource>,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>)
+       {
                let mut outbounds = self.pending_outbound_payments.lock().unwrap();
                let mut pending_events = pending_events.lock().unwrap();
                for source in sources {
@@ -1166,20 +1168,20 @@ impl OutboundPayments {
                                if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
                                        assert!(payment.get().is_fulfilled());
                                        if payment.get_mut().remove(&session_priv_bytes, None) {
-                                               pending_events.push(
-                                                       events::Event::PaymentPathSuccessful {
-                                                               payment_id,
-                                                               payment_hash: payment.get().payment_hash(),
-                                                               path,
-                                                       }
-                                               );
+                                               pending_events.push_back((events::Event::PaymentPathSuccessful {
+                                                       payment_id,
+                                                       payment_hash: payment.get().payment_hash(),
+                                                       path,
+                                               }, None));
                                        }
                                }
                        }
                }
        }
 
-       pub(super) fn remove_stale_resolved_payments(&self, pending_events: &Mutex<Vec<events::Event>>) {
+       pub(super) fn remove_stale_resolved_payments(&self,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>)
+       {
                // If an outbound payment was completed, and no pending HTLCs remain, we should remove it
                // from the map. However, if we did that immediately when the last payment HTLC is claimed,
                // this could race the user making a duplicate send_payment call and our idempotency
@@ -1193,7 +1195,7 @@ impl OutboundPayments {
                        if let PendingOutboundPayment::Fulfilled { session_privs, timer_ticks_without_htlcs, .. } = payment {
                                let mut no_remaining_entries = session_privs.is_empty();
                                if no_remaining_entries {
-                                       for ev in pending_events.iter() {
+                                       for (ev, _) in pending_events.iter() {
                                                match ev {
                                                        events::Event::PaymentSent { payment_id: Some(ev_payment_id), .. } |
                                                                events::Event::PaymentPathSuccessful { payment_id: ev_payment_id, .. } |
@@ -1221,8 +1223,9 @@ impl OutboundPayments {
        // Returns a bool indicating whether a PendingHTLCsForwardable event should be generated.
        pub(super) fn fail_htlc<L: Deref>(
                &self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason,
-               path: &Path, session_priv: &SecretKey, payment_id: &PaymentId, probing_cookie_secret: [u8; 32],
-               secp_ctx: &Secp256k1<secp256k1::All>, pending_events: &Mutex<Vec<events::Event>>, logger: &L
+               path: &Path, session_priv: &SecretKey, payment_id: &PaymentId,
+               probing_cookie_secret: [u8; 32], secp_ctx: &Secp256k1<secp256k1::All>,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>, logger: &L,
        ) -> bool where L::Target: Logger {
                #[cfg(test)]
                let (network_update, short_channel_id, payment_retryable, onion_error_code, onion_error_data) = onion_error.decode_onion_failure(secp_ctx, logger, &source);
@@ -1334,24 +1337,25 @@ impl OutboundPayments {
                        }
                };
                let mut pending_events = pending_events.lock().unwrap();
-               pending_events.push(path_failure);
-               if let Some(ev) = full_failure_ev { pending_events.push(ev); }
+               pending_events.push_back((path_failure, None));
+               if let Some(ev) = full_failure_ev { pending_events.push_back((ev, None)); }
                pending_retry_ev
        }
 
        pub(super) fn abandon_payment(
-               &self, payment_id: PaymentId, reason: PaymentFailureReason, pending_events: &Mutex<Vec<events::Event>>
+               &self, payment_id: PaymentId, reason: PaymentFailureReason,
+               pending_events: &Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>
        ) {
                let mut outbounds = self.pending_outbound_payments.lock().unwrap();
                if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
                        payment.get_mut().mark_abandoned(reason);
                        if let PendingOutboundPayment::Abandoned { payment_hash, reason, .. } = payment.get() {
                                if payment.get().remaining_parts() == 0 {
-                                       pending_events.lock().unwrap().push(events::Event::PaymentFailed {
+                                       pending_events.lock().unwrap().push_back((events::Event::PaymentFailed {
                                                payment_id,
                                                payment_hash: *payment_hash,
                                                reason: *reason,
-                                       });
+                                       }, None));
                                        payment.remove();
                                }
                        }
@@ -1435,6 +1439,8 @@ mod tests {
        use crate::util::errors::APIError;
        use crate::util::test_utils;
 
+       use alloc::collections::VecDeque;
+
        #[test]
        #[cfg(feature = "std")]
        fn fails_paying_after_expiration() {
@@ -1460,7 +1466,7 @@ mod tests {
                        payment_params,
                        final_value_msat: 0,
                };
-               let pending_events = Mutex::new(Vec::new());
+               let pending_events = Mutex::new(VecDeque::new());
                if on_retry {
                        outbound_payments.add_new_pending_payment(PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(),
                                PaymentId([0; 32]), None, &Route { paths: vec![], payment_params: None },
@@ -1472,7 +1478,7 @@ mod tests {
                                &pending_events, &|_, _, _, _, _, _, _, _| Ok(()));
                        let events = pending_events.lock().unwrap();
                        assert_eq!(events.len(), 1);
-                       if let Event::PaymentFailed { ref reason, .. } = events[0] {
+                       if let Event::PaymentFailed { ref reason, .. } = events[0].0 {
                                assert_eq!(reason.unwrap(), PaymentFailureReason::PaymentExpired);
                        } else { panic!("Unexpected event"); }
                } else {
@@ -1508,7 +1514,7 @@ mod tests {
                router.expect_find_route(route_params.clone(),
                        Err(LightningError { err: String::new(), action: ErrorAction::IgnoreError }));
 
-               let pending_events = Mutex::new(Vec::new());
+               let pending_events = Mutex::new(VecDeque::new());
                if on_retry {
                        outbound_payments.add_new_pending_payment(PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(),
                                PaymentId([0; 32]), None, &Route { paths: vec![], payment_params: None },
@@ -1520,7 +1526,7 @@ mod tests {
                                &pending_events, &|_, _, _, _, _, _, _, _| Ok(()));
                        let events = pending_events.lock().unwrap();
                        assert_eq!(events.len(), 1);
-                       if let Event::PaymentFailed { .. } = events[0] { } else { panic!("Unexpected event"); }
+                       if let Event::PaymentFailed { .. } = events[0].0 { } else { panic!("Unexpected event"); }
                } else {
                        let err = outbound_payments.send_payment(
                                PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(), PaymentId([0; 32]),
@@ -1570,7 +1576,7 @@ mod tests {
 
                // Ensure that a ChannelUnavailable error will result in blaming an scid in the
                // PaymentPathFailed event.
-               let pending_events = Mutex::new(Vec::new());
+               let pending_events = Mutex::new(VecDeque::new());
                outbound_payments.send_payment(
                        PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(), PaymentId([0; 32]),
                        Retry::Attempts(0), route_params.clone(), &&router, vec![], || InFlightHtlcs::new(),
@@ -1581,11 +1587,11 @@ mod tests {
                assert_eq!(events.len(), 2);
                if let Event::PaymentPathFailed {
                        short_channel_id,
-                       failure: PathFailure::InitialSend { err: APIError::ChannelUnavailable { .. }}, .. } = events[0]
+                       failure: PathFailure::InitialSend { err: APIError::ChannelUnavailable { .. }}, .. } = events[0].0
                {
                        assert_eq!(short_channel_id, Some(failed_scid));
                } else { panic!("Unexpected event"); }
-               if let Event::PaymentFailed { .. } = events[1] { } else { panic!("Unexpected event"); }
+               if let Event::PaymentFailed { .. } = events[1].0 { } else { panic!("Unexpected event"); }
                events.clear();
                core::mem::drop(events);
 
@@ -1608,10 +1614,10 @@ mod tests {
                assert_eq!(events.len(), 2);
                if let Event::PaymentPathFailed {
                        short_channel_id,
-                       failure: PathFailure::InitialSend { err: APIError::APIMisuseError { .. }}, .. } = events[0]
+                       failure: PathFailure::InitialSend { err: APIError::APIMisuseError { .. }}, .. } = events[0].0
                {
                        assert_eq!(short_channel_id, None);
                } else { panic!("Unexpected event"); }
-               if let Event::PaymentFailed { .. } = events[1] { } else { panic!("Unexpected event"); }
+               if let Event::PaymentFailed { .. } = events[1].0 { } else { panic!("Unexpected event"); }
        }
 }