Track an `EventCompletionAction` for after an `Event` is processed
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 42211408984c2aeb20cc20dec2a48dd58686db3e..dc824be7b7d0532885cce8c36f38cb2fbd57a910 100644 (file)
@@ -521,6 +521,20 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
        (2, EmitEvent) => { (0, event, upgradable_required) },
 );
 
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub(crate) enum EventCompletionAction {
+       ReleaseRAAChannelMonitorUpdate {
+               counterparty_node_id: PublicKey,
+               channel_funding_outpoint: OutPoint,
+       },
+}
+impl_writeable_tlv_based_enum!(EventCompletionAction,
+       (0, ReleaseRAAChannelMonitorUpdate) => {
+               (0, channel_funding_outpoint, required),
+               (2, counterparty_node_id, required),
+       };
+);
+
 /// State we hold per-peer.
 pub(super) struct PeerState<Signer: ChannelSigner> {
        /// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -932,8 +946,17 @@ where
        #[cfg(any(test, feature = "_test_utils"))]
        pub(super) per_peer_state: FairRwLock<HashMap<PublicKey, Mutex<PeerState<<SP::Target as SignerProvider>::Signer>>>>,
 
+       /// The set of events which we need to give to the user to handle. In some cases an event may
+       /// require some further action after the user handles it (currently only blocking a monitor
+       /// update from being handed to the user to ensure the included changes to the channel state
+       /// are handled by the user before they're persisted durably to disk). In that case, the second
+       /// element in the tuple is set to `Some` with further details of the action.
+       ///
+       /// Note that events MUST NOT be removed from pending_events after deserialization, as they
+       /// could be in the middle of being processed without the direct mutex held.
+       ///
        /// See `ChannelManager` struct-level documentation for lock order requirements.
-       pending_events: Mutex<Vec<events::Event>>,
+       pending_events: Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>,
        /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
        pending_events_processor: AtomicBool,
        /// See `ChannelManager` struct-level documentation for lock order requirements.
@@ -1446,10 +1469,10 @@ macro_rules! handle_error {
                                                });
                                        }
                                        if let Some((channel_id, user_channel_id)) = chan_id {
-                                               $self.pending_events.lock().unwrap().push(events::Event::ChannelClosed {
+                                               $self.pending_events.lock().unwrap().push_back((events::Event::ChannelClosed {
                                                        channel_id, user_channel_id,
                                                        reason: ClosureReason::ProcessingError { err: err.err.clone() }
-                                               });
+                                               }, None));
                                        }
                                }
 
@@ -1581,13 +1604,13 @@ macro_rules! send_channel_ready {
 macro_rules! emit_channel_pending_event {
        ($locked_events: expr, $channel: expr) => {
                if $channel.should_emit_channel_pending_event() {
-                       $locked_events.push(events::Event::ChannelPending {
+                       $locked_events.push_back((events::Event::ChannelPending {
                                channel_id: $channel.channel_id(),
                                former_temporary_channel_id: $channel.temporary_channel_id(),
                                counterparty_node_id: $channel.get_counterparty_node_id(),
                                user_channel_id: $channel.get_user_id(),
                                funding_txo: $channel.get_funding_txo().unwrap().into_bitcoin_outpoint(),
-                       });
+                       }, None));
                        $channel.set_channel_pending_event_emitted();
                }
        }
@@ -1597,12 +1620,12 @@ macro_rules! emit_channel_ready_event {
        ($locked_events: expr, $channel: expr) => {
                if $channel.should_emit_channel_ready_event() {
                        debug_assert!($channel.channel_pending_event_emitted());
-                       $locked_events.push(events::Event::ChannelReady {
+                       $locked_events.push_back((events::Event::ChannelReady {
                                channel_id: $channel.channel_id(),
                                user_channel_id: $channel.get_user_id(),
                                counterparty_node_id: $channel.get_counterparty_node_id(),
                                channel_type: $channel.get_channel_type().clone(),
-                       });
+                       }, None));
                        $channel.set_channel_ready_event_emitted();
                }
        }
@@ -1721,7 +1744,7 @@ macro_rules! process_events_body {
                                result = NotifyOption::DoPersist;
                        }
 
-                       for event in pending_events {
+                       for (event, _action) in pending_events {
                                $event_to_handle = event;
                                $handle_event;
                        }
@@ -1802,7 +1825,7 @@ where
 
                        per_peer_state: FairRwLock::new(HashMap::new()),
 
-                       pending_events: Mutex::new(Vec::new()),
+                       pending_events: Mutex::new(VecDeque::new()),
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
@@ -2010,15 +2033,17 @@ where
                let mut pending_events_lock = self.pending_events.lock().unwrap();
                match channel.unbroadcasted_funding() {
                        Some(transaction) => {
-                               pending_events_lock.push(events::Event::DiscardFunding { channel_id: channel.channel_id(), transaction })
+                               pending_events_lock.push_back((events::Event::DiscardFunding {
+                                       channel_id: channel.channel_id(), transaction
+                               }, None));
                        },
                        None => {},
                }
-               pending_events_lock.push(events::Event::ChannelClosed {
+               pending_events_lock.push_back((events::Event::ChannelClosed {
                        channel_id: channel.channel_id(),
                        user_channel_id: channel.get_user_id(),
                        reason: closure_reason
-               });
+               }, None));
        }
 
        fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option<u32>) -> Result<(), APIError> {
@@ -3233,7 +3258,7 @@ where
        pub fn process_pending_htlc_forwards(&self) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
-               let mut new_events = Vec::new();
+               let mut new_events = VecDeque::new();
                let mut failed_forwards = Vec::new();
                let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
                {
@@ -3559,7 +3584,7 @@ where
                                                                                        htlcs.push(claimable_htlc);
                                                                                        let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum();
                                                                                        htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat));
-                                                                                       new_events.push(events::Event::PaymentClaimable {
+                                                                                       new_events.push_back((events::Event::PaymentClaimable {
                                                                                                receiver_node_id: Some(receiver_node_id),
                                                                                                payment_hash,
                                                                                                purpose: purpose(),
@@ -3568,7 +3593,7 @@ where
                                                                                                via_user_channel_id: Some(prev_user_channel_id),
                                                                                                claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER),
                                                                                                onion_fields: claimable_payment.onion_fields.clone(),
-                                                                                       });
+                                                                                       }, None));
                                                                                        payment_claimable_generated = true;
                                                                                } else {
                                                                                        // Nothing to do - we haven't reached the total
@@ -3629,7 +3654,7 @@ where
                                                                                                                        htlcs: vec![claimable_htlc],
                                                                                                                });
                                                                                                                let prev_channel_id = prev_funding_outpoint.to_channel_id();
-                                                                                                               new_events.push(events::Event::PaymentClaimable {
+                                                                                                               new_events.push_back((events::Event::PaymentClaimable {
                                                                                                                        receiver_node_id: Some(receiver_node_id),
                                                                                                                        payment_hash,
                                                                                                                        amount_msat,
@@ -3638,7 +3663,7 @@ where
                                                                                                                        via_user_channel_id: Some(prev_user_channel_id),
                                                                                                                        claim_deadline,
                                                                                                                        onion_fields: Some(onion_fields),
-                                                                                                               });
+                                                                                                               }, None));
                                                                                                        },
                                                                                                        hash_map::Entry::Occupied(_) => {
                                                                                                                log_trace!(self.logger, "Failing new keysend HTLC with payment_hash {} for a duplicative payment hash", log_bytes!(payment_hash.0));
@@ -4116,10 +4141,10 @@ where
                                mem::drop(forward_htlcs);
                                if push_forward_ev { self.push_pending_forwards_ev(); }
                                let mut pending_events = self.pending_events.lock().unwrap();
-                               pending_events.push(events::Event::HTLCHandlingFailed {
+                               pending_events.push_back((events::Event::HTLCHandlingFailed {
                                        prev_channel_id: outpoint.to_channel_id(),
                                        failed_next_destination: destination,
-                               });
+                               }, None));
                        },
                }
        }
@@ -4382,13 +4407,13 @@ where
                                MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
                                        let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                                        if let Some(ClaimingPayment { amount_msat, payment_purpose: purpose, receiver_node_id }) = payment {
-                                               self.pending_events.lock().unwrap().push(events::Event::PaymentClaimed {
+                                               self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed {
                                                        payment_hash, purpose, amount_msat, receiver_node_id: Some(receiver_node_id),
-                                               });
+                                               }, None));
                                        }
                                },
                                MonitorUpdateCompletionAction::EmitEvent { event } => {
-                                       self.pending_events.lock().unwrap().push(event);
+                                       self.pending_events.lock().unwrap().push_back((event, None));
                                },
                        }
                }
@@ -4712,15 +4737,13 @@ where
                                        });
                                } else {
                                        let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(
-                                               events::Event::OpenChannelRequest {
-                                                       temporary_channel_id: msg.temporary_channel_id.clone(),
-                                                       counterparty_node_id: counterparty_node_id.clone(),
-                                                       funding_satoshis: msg.funding_satoshis,
-                                                       push_msat: msg.push_msat,
-                                                       channel_type: channel.get_channel_type().clone(),
-                                               }
-                                       );
+                                       pending_events.push_back((events::Event::OpenChannelRequest {
+                                               temporary_channel_id: msg.temporary_channel_id.clone(),
+                                               counterparty_node_id: counterparty_node_id.clone(),
+                                               funding_satoshis: msg.funding_satoshis,
+                                               push_msat: msg.push_msat,
+                                               channel_type: channel.get_channel_type().clone(),
+                                       }, None));
                                }
 
                                entry.insert(channel);
@@ -4748,13 +4771,13 @@ where
                        }
                };
                let mut pending_events = self.pending_events.lock().unwrap();
-               pending_events.push(events::Event::FundingGenerationReady {
+               pending_events.push_back((events::Event::FundingGenerationReady {
                        temporary_channel_id: msg.temporary_channel_id,
                        counterparty_node_id: *counterparty_node_id,
                        channel_value_satoshis: value,
                        output_script,
                        user_channel_id: user_id,
-               });
+               }, None));
                Ok(())
        }
 
@@ -5144,7 +5167,7 @@ where
        fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) {
                for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
                        let mut push_forward_event = false;
-                       let mut new_intercept_events = Vec::new();
+                       let mut new_intercept_events = VecDeque::new();
                        let mut failed_intercept_forwards = Vec::new();
                        if !pending_forwards.is_empty() {
                                for (forward_info, prev_htlc_id) in pending_forwards.drain(..) {
@@ -5171,13 +5194,13 @@ where
                                                                let mut pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap();
                                                                match pending_intercepts.entry(intercept_id) {
                                                                        hash_map::Entry::Vacant(entry) => {
-                                                                               new_intercept_events.push(events::Event::HTLCIntercepted {
+                                                                               new_intercept_events.push_back((events::Event::HTLCIntercepted {
                                                                                        requested_next_hop_scid: scid,
                                                                                        payment_hash: forward_info.payment_hash,
                                                                                        inbound_amount_msat: forward_info.incoming_amt_msat.unwrap(),
                                                                                        expected_outbound_amount_msat: forward_info.outgoing_amt_msat,
                                                                                        intercept_id
-                                                                               });
+                                                                               }, None));
                                                                                entry.insert(PendingAddHTLCInfo {
                                                                                        prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info });
                                                                        },
@@ -5227,13 +5250,13 @@ where
        fn push_pending_forwards_ev(&self) {
                let mut pending_events = self.pending_events.lock().unwrap();
                let forward_ev_exists = pending_events.iter()
-                       .find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
+                       .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
                        .is_some();
                if !forward_ev_exists {
-                       pending_events.push(events::Event::PendingHTLCsForwardable {
+                       pending_events.push_back((events::Event::PendingHTLCsForwardable {
                                time_forwardable:
                                        Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
-                       });
+                       }, None));
                }
        }
 
@@ -5884,13 +5907,13 @@ where
        #[cfg(feature = "_test_utils")]
        pub fn push_pending_event(&self, event: events::Event) {
                let mut events = self.pending_events.lock().unwrap();
-               events.push(event);
+               events.push_back((event, None));
        }
 
        #[cfg(test)]
        pub fn pop_pending_event(&self) -> Option<events::Event> {
                let mut events = self.pending_events.lock().unwrap();
-               if events.is_empty() { None } else { Some(events.remove(0)) }
+               events.pop_front().map(|(e, _)| e)
        }
 
        #[cfg(test)]
@@ -7227,9 +7250,19 @@ where
                }
 
                let events = self.pending_events.lock().unwrap();
-               (events.len() as u64).write(writer)?;
-               for event in events.iter() {
-                       event.write(writer)?;
+               // LDK versions prior to 0.0.115 don't support post-event actions, thus if there's no
+               // actions at all, skip writing the required TLV. Otherwise, pre-0.0.115 versions will
+               // refuse to read the new ChannelManager.
+               let events_not_backwards_compatible = events.iter().any(|(_, action)| action.is_some());
+               if events_not_backwards_compatible {
+                       // If we're gonna write a even TLV that will overwrite our events anyway we might as
+                       // well save the space and not write any events here.
+                       0u64.write(writer)?;
+               } else {
+                       (events.len() as u64).write(writer)?;
+                       for (event, _) in events.iter() {
+                               event.write(writer)?;
+                       }
                }
 
                let background_events = self.pending_background_events.lock().unwrap();
@@ -7310,6 +7343,7 @@ where
                        (5, self.our_network_pubkey, required),
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, self.fake_scid_rand_bytes, required),
+                       (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
                        (9, htlc_purposes, vec_type),
                        (11, self.probing_cookie_secret, required),
                        (13, htlc_onion_fields, optional_vec),
@@ -7319,6 +7353,47 @@ where
        }
 }
 
+impl Writeable for VecDeque<(Event, Option<EventCompletionAction>)> {
+       fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
+               (self.len() as u64).write(w)?;
+               for (event, action) in self.iter() {
+                       event.write(w)?;
+                       action.write(w)?;
+                       #[cfg(debug_assertions)] {
+                               // Events are MaybeReadable, in some cases indicating that they shouldn't actually
+                               // be persisted and are regenerated on restart. However, if such an event has a
+                               // post-event-handling action we'll write nothing for the event and would have to
+                               // either forget the action or fail on deserialization (which we do below). Thus,
+                               // check that the event is sane here.
+                               let event_encoded = event.encode();
+                               let event_read: Option<Event> =
+                                       MaybeReadable::read(&mut &event_encoded[..]).unwrap();
+                               if action.is_some() { assert!(event_read.is_some()); }
+                       }
+               }
+               Ok(())
+       }
+}
+impl Readable for VecDeque<(Event, Option<EventCompletionAction>)> {
+       fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
+               let len: u64 = Readable::read(reader)?;
+               const MAX_ALLOC_SIZE: u64 = 1024 * 16;
+               let mut events: Self = VecDeque::with_capacity(cmp::min(
+                       MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option<EventCompletionAction>)>() as u64,
+                       len) as usize);
+               for _ in 0..len {
+                       let ev_opt = MaybeReadable::read(reader)?;
+                       let action = Readable::read(reader)?;
+                       if let Some(ev) = ev_opt {
+                               events.push_back((ev, action));
+                       } else if action.is_some() {
+                               return Err(DecodeError::InvalidValue);
+                       }
+               }
+               Ok(events)
+       }
+}
+
 /// Arguments for the creation of a ChannelManager that are not deserialized.
 ///
 /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
@@ -7485,7 +7560,7 @@ where
                let mut peer_channels: HashMap<PublicKey, HashMap<[u8; 32], Channel<<SP::Target as SignerProvider>::Signer>>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
-               let mut channel_closures = Vec::new();
+               let mut channel_closures = VecDeque::new();
                let mut pending_background_events = Vec::new();
                for _ in 0..channel_count {
                        let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (
@@ -7521,11 +7596,11 @@ where
                                                pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate(monitor_update));
                                        }
                                        failed_htlcs.append(&mut new_failed_htlcs);
-                                       channel_closures.push(events::Event::ChannelClosed {
+                                       channel_closures.push_back((events::Event::ChannelClosed {
                                                channel_id: channel.channel_id(),
                                                user_channel_id: channel.get_user_id(),
                                                reason: ClosureReason::OutdatedChannelManager
-                                       });
+                                       }, None));
                                        for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() {
                                                let mut found_htlc = false;
                                                for (monitor_htlc_source, _) in monitor.get_all_current_outbound_htlcs() {
@@ -7570,11 +7645,11 @@ where
                                // was in-progress, we never broadcasted the funding transaction and can still
                                // safely discard the channel.
                                let _ = channel.force_shutdown(false);
-                               channel_closures.push(events::Event::ChannelClosed {
+                               channel_closures.push_back((events::Event::ChannelClosed {
                                        channel_id: channel.channel_id(),
                                        user_channel_id: channel.get_user_id(),
                                        reason: ClosureReason::DisconnectedPeer,
-                               });
+                               }, None));
                        } else {
                                log_error!(args.logger, "Missing ChannelMonitor for channel {} needed by ChannelManager.", log_bytes!(channel.channel_id()));
                                log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
@@ -7635,10 +7710,11 @@ where
                }
 
                let event_count: u64 = Readable::read(reader)?;
-               let mut pending_events_read: Vec<events::Event> = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<events::Event>()));
+               let mut pending_events_read: VecDeque<(events::Event, Option<EventCompletionAction>)> =
+                       VecDeque::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option<EventCompletionAction>)>()));
                for _ in 0..event_count {
                        match MaybeReadable::read(reader)? {
-                               Some(event) => pending_events_read.push(event),
+                               Some(event) => pending_events_read.push_back((event, None)),
                                None => continue,
                        }
                }
@@ -7694,6 +7770,7 @@ where
                let mut claimable_htlc_onion_fields = None;
                let mut pending_claiming_payments = Some(HashMap::new());
                let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
+               let mut events_override = None;
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
                        (2, pending_intercepted_htlcs, option),
@@ -7702,6 +7779,7 @@ where
                        (5, received_network_pubkey, option),
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, fake_scid_rand_bytes, option),
+                       (8, events_override, option),
                        (9, claimable_htlc_purposes, vec_type),
                        (11, probing_cookie_secret, option),
                        (13, claimable_htlc_onion_fields, optional_vec),
@@ -7714,6 +7792,10 @@ where
                        probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes());
                }
 
+               if let Some(events) = events_override {
+                       pending_events_read = events;
+               }
+
                if !channel_closures.is_empty() {
                        pending_events_read.append(&mut channel_closures);
                }
@@ -7809,7 +7891,7 @@ where
                                                                        if pending_forward_matches_htlc(&htlc_info) {
                                                                                log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}",
                                                                                        log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id()));
-                                                                               pending_events_read.retain(|event| {
+                                                                               pending_events_read.retain(|(event, _)| {
                                                                                        if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event {
                                                                                                intercepted_id != ev_id
                                                                                        } else { true }
@@ -7845,9 +7927,9 @@ where
                        // shut down before the timer hit. Either way, set the time_forwardable to a small
                        // constant as enough time has likely passed that we should simply handle the forwards
                        // now, or at least after the user gets a chance to reconnect to our peers.
-                       pending_events_read.push(events::Event::PendingHTLCsForwardable {
+                       pending_events_read.push_back((events::Event::PendingHTLCsForwardable {
                                time_forwardable: Duration::from_secs(2),
-                       });
+                       }, None));
                }
 
                let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material();
@@ -8001,12 +8083,12 @@ where
                                                        previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &args.tx_broadcaster, &bounded_fee_estimator, &args.logger);
                                                }
                                        }
-                                       pending_events_read.push(events::Event::PaymentClaimed {
+                                       pending_events_read.push_back((events::Event::PaymentClaimed {
                                                receiver_node_id,
                                                payment_hash,
                                                purpose: payment.purpose,
                                                amount_msat: claimable_amt_msat,
-                                       });
+                                       }, None));
                                }
                        }
                }