Move in-flight `ChannelMonitorUpdate`s to `ChannelManager`
[rust-lightning] / lightning / src / ln / channel.rs
index 26b70f286257a447ca0487819833c710303370df..d68108b3155dc6e03988eb4be6be2e9b662f4331 100644 (file)
@@ -2264,7 +2264,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        }
 
        pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
-               let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
+               let release_cs_monitor = self.context.pending_monitor_updates.is_empty();
                match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
                        UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
                                // Even if we aren't supposed to let new monitor updates with commitment state
@@ -2272,26 +2272,17 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                // matter what. Sadly, to push a new monitor update which flies before others
                                // already queued, we have to insert it into the pending queue and update the
                                // update_ids of all the following monitors.
-                               let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
+                               if release_cs_monitor && msg.is_some() {
                                        let mut additional_update = self.build_commitment_no_status_check(logger);
                                        // build_commitment_no_status_check may bump latest_monitor_id but we want them
                                        // to be strictly increasing by one, so decrement it here.
                                        self.context.latest_monitor_update_id = monitor_update.update_id;
                                        monitor_update.updates.append(&mut additional_update.updates);
-                                       self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
-                                               update: monitor_update, blocked: false,
-                                       });
-                                       self.context.pending_monitor_updates.len() - 1
                                } else {
-                                       let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked)
-                                               .unwrap_or(self.context.pending_monitor_updates.len());
-                                       let new_mon_id = self.context.pending_monitor_updates.get(insert_pos)
+                                       let new_mon_id = self.context.pending_monitor_updates.get(0)
                                                .map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
                                        monitor_update.update_id = new_mon_id;
-                                       self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
-                                               update: monitor_update, blocked: false,
-                                       });
-                                       for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
+                                       for held_update in self.context.pending_monitor_updates.iter_mut() {
                                                held_update.update.update_id += 1;
                                        }
                                        if msg.is_some() {
@@ -2301,14 +2292,10 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                                        update, blocked: true,
                                                });
                                        }
-                                       insert_pos
-                               };
-                               self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
-                               UpdateFulfillCommitFetch::NewClaim {
-                                       monitor_update: self.context.pending_monitor_updates.get(unblocked_update_pos)
-                                               .expect("We just pushed the monitor update").update.clone(),
-                                       htlc_value_msat,
                                }
+
+                               self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
+                               UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, }
                        },
                        UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
                }
@@ -3349,8 +3336,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                }
 
                match self.free_holding_cell_htlcs(logger) {
-                       (Some(_), htlcs_to_fail) => {
-                               let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update;
+                       (Some(mut additional_update), htlcs_to_fail) => {
                                // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
                                // strictly increasing by one, so decrement it here.
                                self.context.latest_monitor_update_id = monitor_update.update_id;
@@ -3566,12 +3552,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        {
                assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
                self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
-               let mut found_blocked = false;
-               self.context.pending_monitor_updates.retain(|upd| {
-                       if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
-                       if upd.blocked { found_blocked = true; }
-                       upd.blocked
-               });
+               for upd in self.context.pending_monitor_updates.iter() {
+                       debug_assert!(upd.blocked);
+               }
 
                // If we're past (or at) the FundingSent stage on an outbound channel, try to
                // (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4439,48 +4422,31 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        /// Returns the next blocked monitor update, if one exists, and a bool which indicates a
        /// further blocked monitor update exists after the next.
        pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
-               for i in 0..self.context.pending_monitor_updates.len() {
-                       if self.context.pending_monitor_updates[i].blocked {
-                               self.context.pending_monitor_updates[i].blocked = false;
-                               return Some((self.context.pending_monitor_updates[i].update.clone(),
-                                       self.context.pending_monitor_updates.len() > i + 1));
-                       }
+               for upd in self.context.pending_monitor_updates.iter() {
+                       debug_assert!(upd.blocked);
                }
-               None
+               if self.context.pending_monitor_updates.is_empty() { return None; }
+               Some((self.context.pending_monitor_updates.remove(0).update,
+                       !self.context.pending_monitor_updates.is_empty()))
        }
 
        /// Pushes a new monitor update into our monitor update queue, returning it if it should be
        /// immediately given to the user for persisting or `None` if it should be held as blocked.
        fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
        -> Option<ChannelMonitorUpdate> {
-               let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
-               self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
-                       update, blocked: !release_monitor,
-               });
-               if release_monitor { self.context.pending_monitor_updates.last().map(|upd| upd.update.clone()) } else { None }
-       }
-
-       pub fn no_monitor_updates_pending(&self) -> bool {
-               self.context.pending_monitor_updates.is_empty()
-       }
-
-       pub fn complete_all_mon_updates_through(&mut self, update_id: u64) {
-               self.context.pending_monitor_updates.retain(|upd| {
-                       if upd.update.update_id <= update_id {
-                               assert!(!upd.blocked, "Completed update must have flown");
-                               false
-                       } else { true }
-               });
-       }
-
-       pub fn complete_one_mon_update(&mut self, update_id: u64) {
-               self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
+               let release_monitor = self.context.pending_monitor_updates.is_empty();
+               if !release_monitor {
+                       self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
+                               update, blocked: true,
+                       });
+                       None
+               } else {
+                       Some(update)
+               }
        }
 
-       /// Returns an iterator over all unblocked monitor updates which have not yet completed.
-       pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator<Item=&ChannelMonitorUpdate> {
-               self.context.pending_monitor_updates.iter()
-                       .filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) })
+       pub fn blocked_monitor_updates_pending(&self) -> usize {
+               self.context.pending_monitor_updates.len()
        }
 
        /// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor.