}
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
- let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
+ let release_cs_monitor = self.context.pending_monitor_updates.is_empty();
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
// Even if we aren't supposed to let new monitor updates with commitment state
// matter what. Sadly, to push a new monitor update which flies before others
// already queued, we have to insert it into the pending queue and update the
// update_ids of all the following monitors.
- let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
+ if release_cs_monitor && msg.is_some() {
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them
// to be strictly increasing by one, so decrement it here.
self.context.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
- self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
- update: monitor_update, blocked: false,
- });
- self.context.pending_monitor_updates.len() - 1
} else {
- let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked)
- .unwrap_or(self.context.pending_monitor_updates.len());
- let new_mon_id = self.context.pending_monitor_updates.get(insert_pos)
+ let new_mon_id = self.context.pending_monitor_updates.get(0)
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
monitor_update.update_id = new_mon_id;
- self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
- update: monitor_update, blocked: false,
- });
- for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
+ for held_update in self.context.pending_monitor_updates.iter_mut() {
held_update.update.update_id += 1;
}
if msg.is_some() {
update, blocked: true,
});
}
- insert_pos
- };
- self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
- UpdateFulfillCommitFetch::NewClaim {
- monitor_update: self.context.pending_monitor_updates.get(unblocked_update_pos)
- .expect("We just pushed the monitor update").update.clone(),
- htlc_value_msat,
}
+
+ self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
+ UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, }
},
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
}
}
match self.free_holding_cell_htlcs(logger) {
- (Some(_), htlcs_to_fail) => {
- let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update;
+ (Some(mut additional_update), htlcs_to_fail) => {
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
// strictly increasing by one, so decrement it here.
self.context.latest_monitor_update_id = monitor_update.update_id;
{
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
- let mut found_blocked = false;
- self.context.pending_monitor_updates.retain(|upd| {
- if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
- if upd.blocked { found_blocked = true; }
- upd.blocked
- });
+ for upd in self.context.pending_monitor_updates.iter() {
+ debug_assert!(upd.blocked);
+ }
// If we're past (or at) the FundingSent stage on an outbound channel, try to
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
/// further blocked monitor update exists after the next.
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
- for i in 0..self.context.pending_monitor_updates.len() {
- if self.context.pending_monitor_updates[i].blocked {
- self.context.pending_monitor_updates[i].blocked = false;
- return Some((self.context.pending_monitor_updates[i].update.clone(),
- self.context.pending_monitor_updates.len() > i + 1));
- }
+ for upd in self.context.pending_monitor_updates.iter() {
+ debug_assert!(upd.blocked);
}
- None
+ if self.context.pending_monitor_updates.is_empty() { return None; }
+ Some((self.context.pending_monitor_updates.remove(0).update,
+ !self.context.pending_monitor_updates.is_empty()))
}
/// Pushes a new monitor update into our monitor update queue, returning it if it should be
/// immediately given to the user for persisting or `None` if it should be held as blocked.
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
-> Option<ChannelMonitorUpdate> {
- let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
- self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
- update, blocked: !release_monitor,
- });
- if release_monitor { self.context.pending_monitor_updates.last().map(|upd| upd.update.clone()) } else { None }
- }
-
- pub fn no_monitor_updates_pending(&self) -> bool {
- self.context.pending_monitor_updates.is_empty()
- }
-
- pub fn complete_all_mon_updates_through(&mut self, update_id: u64) {
- self.context.pending_monitor_updates.retain(|upd| {
- if upd.update.update_id <= update_id {
- assert!(!upd.blocked, "Completed update must have flown");
- false
- } else { true }
- });
- }
-
- pub fn complete_one_mon_update(&mut self, update_id: u64) {
- self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
+ let release_monitor = self.context.pending_monitor_updates.is_empty();
+ if !release_monitor {
+ self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
+ update, blocked: true,
+ });
+ None
+ } else {
+ Some(update)
+ }
}
- /// Returns an iterator over all unblocked monitor updates which have not yet completed.
- pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator<Item=&ChannelMonitorUpdate> {
- self.context.pending_monitor_updates.iter()
- .filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) })
+ pub fn blocked_monitor_updates_pending(&self) -> usize {
+ self.context.pending_monitor_updates.len()
}
/// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor.