Handle `EventCompletionAction`s after events complete
authorMatt Corallo <git@bluematt.me>
Thu, 16 Mar 2023 03:33:20 +0000 (03:33 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 2 May 2023 17:59:22 +0000 (17:59 +0000)
This adds handling of the new `EventCompletionAction`s after
`Event`s are handled, letting `ChannelMonitorUpdate`s which were
blocked fly after a relevant `Event`.

lightning/src/ln/channelmanager.rs

index dc824be7b7d0532885cce8c36f38cb2fbd57a910..2e17664ea882d9a61e1b939fe70862365449a5d2 100644 (file)
@@ -1744,9 +1744,14 @@ macro_rules! process_events_body {
                                result = NotifyOption::DoPersist;
                        }
 
-                       for (event, _action) in pending_events {
+                       let mut post_event_actions = Vec::new();
+
+                       for (event, action_opt) in pending_events {
                                $event_to_handle = event;
                                $handle_event;
+                               if let Some(action) = action_opt {
+                                       post_event_actions.push(action);
+                               }
                        }
 
                        {
@@ -1756,6 +1761,12 @@ macro_rules! process_events_body {
                                $self.pending_events_processor.store(false, Ordering::Release);
                        }
 
+                       if !post_event_actions.is_empty() {
+                               $self.handle_post_event_actions(post_event_actions);
+                               // If we had some actions, go around again as we may have more events now
+                               processed_all_events = false;
+                       }
+
                        if result == NotifyOption::DoPersist {
                                $self.persistence_notifier.notify();
                        }
@@ -5926,6 +5937,72 @@ where
                self.pending_outbound_payments.clear_pending_payments()
        }
 
+       fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
+               let mut errors = Vec::new();
+               loop {
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
+                               let mut peer_state_lck = peer_state_mtx.lock().unwrap();
+                               let peer_state = &mut *peer_state_lck;
+                               if self.pending_events.lock().unwrap().iter()
+                                       .any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
+                                               channel_funding_outpoint, counterparty_node_id
+                                       }))
+                               {
+                                       // Check that, while holding the peer lock, we don't have another event
+                                       // blocking any monitor updates for this channel. If we do, let those
+                                       // events be the ones that ultimately release the monitor update(s).
+                                       log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
+                                               log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
+                                       break;
+                               }
+                               if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
+                                       debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
+                                       if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
+                                               log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor",
+                                                       log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
+                                               let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update);
+                                               let update_id = monitor_update.update_id;
+                                               if let Err(e) = handle_new_monitor_update!(self, update_res, update_id,
+                                                       peer_state_lck, peer_state, per_peer_state, chan)
+                                               {
+                                                       errors.push((e, counterparty_node_id));
+                                               }
+                                               if further_update_exists {
+                                                       // If there are more `ChannelMonitorUpdate`s to process, restart at the
+                                                       // top of the loop.
+                                                       continue;
+                                               }
+                                       } else {
+                                               log_trace!(self.logger, "Unlocked monitor updating for channel {} without monitors to update",
+                                                       log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
+                                       }
+                               }
+                       } else {
+                               log_debug!(self.logger,
+                                       "Got a release post-RAA monitor update for peer {} but the channel is gone",
+                                       log_pubkey!(counterparty_node_id));
+                       }
+                       break;
+               }
+               for (err, counterparty_node_id) in errors {
+                       let res = Err::<(), _>(err);
+                       let _ = handle_error!(self, res, counterparty_node_id);
+               }
+       }
+
+       fn handle_post_event_actions(&self, actions: Vec<EventCompletionAction>) {
+               for action in actions {
+                       match action {
+                               EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
+                                       channel_funding_outpoint, counterparty_node_id
+                               } => {
+                                       self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
+                               }
+                       }
+               }
+       }
+
        /// Processes any events asynchronously in the order they were generated since the last call
        /// using the given event handler.
        ///