/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5;
+struct PendingChannelMonitorUpdate {
+ update: ChannelMonitorUpdate,
+ /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
+ /// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
+ /// blocked on some external event and the [`ChannelManager`] will update us when we're ready.
+ ///
+ /// [`ChannelManager`]: super::channelmanager::ChannelManager
+ blocked: bool,
+}
+
// TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking
// has been completed, and then turn into a Channel to get compiler-time enforcement of things like
// calling channel_id() before we're set up or things like get_outbound_funding_signed on an
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
- pending_monitor_updates: Vec<ChannelMonitorUpdate>,
+ pending_monitor_updates: Vec<PendingChannelMonitorUpdate>,
}
#[cfg(any(test, fuzzing))]
}
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
+ let release_cs_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked);
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
- UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
- let mut additional_update = self.build_commitment_no_status_check(logger);
- // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
- // strictly increasing by one, so decrement it here.
- self.latest_monitor_update_id = monitor_update.update_id;
- monitor_update.updates.append(&mut additional_update.updates);
- self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
- self.pending_monitor_updates.push(monitor_update);
+ UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
+ // Even if we aren't supposed to let new monitor updates with commitment state
+ // updates run, we still need to push the preimage ChannelMonitorUpdateStep no
+ // matter what. Sadly, to push a new monitor update which flies before others
+ // already queued, we have to insert it into the pending queue and update the
+ // update_ids of all the following monitors.
+ let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
+ let mut additional_update = self.build_commitment_no_status_check(logger);
+ // build_commitment_no_status_check may bump latest_monitor_id but we want them
+ // to be strictly increasing by one, so decrement it here.
+ self.latest_monitor_update_id = monitor_update.update_id;
+ monitor_update.updates.append(&mut additional_update.updates);
+ self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
+ update: monitor_update, blocked: false,
+ });
+ self.pending_monitor_updates.len() - 1
+ } else {
+ let insert_pos = self.pending_monitor_updates.iter().position(|upd| upd.blocked)
+ .unwrap_or(self.pending_monitor_updates.len());
+ let new_mon_id = self.pending_monitor_updates.get(insert_pos)
+ .map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
+ monitor_update.update_id = new_mon_id;
+ self.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
+ update: monitor_update, blocked: false,
+ });
+ for held_update in self.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
+ held_update.update.update_id += 1;
+ }
+ if msg.is_some() {
+ debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
+ let update = self.build_commitment_no_status_check(logger);
+ self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
+ update, blocked: true,
+ });
+ }
+ insert_pos
+ };
+ self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
UpdateFulfillCommitFetch::NewClaim {
- monitor_update: self.pending_monitor_updates.last().unwrap(),
+ monitor_update: &self.pending_monitor_updates.get(unblocked_update_pos)
+ .expect("We just pushed the monitor update").update,
htlc_value_msat,
}
},
- UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
- self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
- self.pending_monitor_updates.push(monitor_update);
- UpdateFulfillCommitFetch::NewClaim {
- monitor_update: self.pending_monitor_updates.last().unwrap(),
- htlc_value_msat,
- }
- }
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
}
}
Ok(())
}
- pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
+ pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError>
where L::Target: Logger
{
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
}
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
log_bytes!(self.channel_id));
- self.pending_monitor_updates.push(monitor_update);
- return Ok(self.pending_monitor_updates.last().unwrap());
+ return Ok(self.push_ret_blockable_mon_update(monitor_update));
}
let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
- self.pending_monitor_updates.push(monitor_update);
self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
- return Ok(self.pending_monitor_updates.last().unwrap());
+ return Ok(self.push_ret_blockable_mon_update(monitor_update));
}
/// Public version of the below, checking relevant preconditions first.
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
- self.pending_monitor_updates.push(monitor_update);
- (Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
+ (self.push_ret_blockable_mon_update(monitor_update), htlcs_to_fail)
} else {
(None, Vec::new())
}
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
/// generating an appropriate error *after* the channel state has been updated based on the
/// revoke_and_ack message.
- pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
+ pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError>
where L::Target: Logger,
{
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
self.monitor_pending_failures.append(&mut revoked_htlcs);
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
- self.pending_monitor_updates.push(monitor_update);
- return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
+ return Ok((Vec::new(), self.push_ret_blockable_mon_update(monitor_update)));
}
match self.free_holding_cell_htlcs(logger) {
(Some(_), htlcs_to_fail) => {
- let mut additional_update = self.pending_monitor_updates.pop().unwrap();
+ let mut additional_update = self.pending_monitor_updates.pop().unwrap().update;
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
- self.pending_monitor_updates.push(monitor_update);
- Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
+ Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
},
(None, htlcs_to_fail) => {
if require_commitment {
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
- self.pending_monitor_updates.push(monitor_update);
- Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
+ Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
} else {
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
- self.pending_monitor_updates.push(monitor_update);
- Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
+ Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
}
}
}
{
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
- self.pending_monitor_updates.clear();
+ let mut found_blocked = false;
+ self.pending_monitor_updates.retain(|upd| {
+ if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
+ if upd.blocked { found_blocked = true; }
+ upd.blocked
+ });
// If we're past (or at) the FundingSent stage on an outbound channel, try to
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
}],
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
- self.pending_monitor_updates.push(monitor_update);
- Some(self.pending_monitor_updates.last().unwrap())
+ if self.push_blockable_mon_update(monitor_update) {
+ self.pending_monitor_updates.last().map(|upd| &upd.update)
+ } else { None }
} else { None };
let shutdown = if send_shutdown {
Some(msgs::Shutdown {
(self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
}
- pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> {
- self.pending_monitor_updates.first()
+ /// Returns the next blocked monitor update, if one exists, and a bool which indicates a
+ /// further blocked monitor update exists after the next.
+ pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> {
+ for i in 0..self.pending_monitor_updates.len() {
+ if self.pending_monitor_updates[i].blocked {
+ self.pending_monitor_updates[i].blocked = false;
+ return Some((&self.pending_monitor_updates[i].update,
+ self.pending_monitor_updates.len() > i + 1));
+ }
+ }
+ None
+ }
+
+ /// Pushes a new monitor update into our monitor update queue, returning whether it should be
+ /// immediately given to the user for persisting or if it should be held as blocked.
+ fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool {
+ let release_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked);
+ self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
+ update, blocked: !release_monitor
+ });
+ release_monitor
+ }
+
+ /// Pushes a new monitor update into our monitor update queue, returning a reference to it if
+ /// it should be immediately given to the user for persisting or `None` if it should be held as
+ /// blocked.
+ fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
+ -> Option<&ChannelMonitorUpdate> {
+ let release_monitor = self.push_blockable_mon_update(update);
+ if release_monitor { self.pending_monitor_updates.last().map(|upd| &upd.update) } else { None }
+ }
+
+ pub fn no_monitor_updates_pending(&self) -> bool {
+ self.pending_monitor_updates.is_empty()
+ }
+
+ pub fn complete_one_mon_update(&mut self, update_id: u64) {
+ self.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
}
/// Returns true if funding_created was sent/received.
Some(_) => {
let monitor_update = self.build_commitment_no_status_check(logger);
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
- self.pending_monitor_updates.push(monitor_update);
- Ok(Some(self.pending_monitor_updates.last().unwrap()))
+ Ok(self.push_ret_blockable_mon_update(monitor_update))
},
None => Ok(None)
}
}],
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
- self.pending_monitor_updates.push(monitor_update);
- Some(self.pending_monitor_updates.last().unwrap())
+ if self.push_blockable_mon_update(monitor_update) {
+ self.pending_monitor_updates.last().map(|upd| &upd.update)
+ } else { None }
} else { None };
let shutdown = msgs::Shutdown {
channel_id: self.channel_id,
res
},
ChannelMonitorUpdateStatus::Completed => {
- if ($update_id == 0 || $chan.get_next_monitor_update()
- .expect("We can't be processing a monitor update if it isn't queued")
- .update_id == $update_id) &&
- $chan.get_latest_monitor_update_id() == $update_id
- {
+ $chan.complete_one_mon_update($update_id);
+ if $chan.no_monitor_updates_pending() {
handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
}
Ok(())
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan) => {
let funding_txo = chan.get().get_funding_txo();
- let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan);
- let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
- let update_id = monitor_update.update_id;
- handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
- peer_state, per_peer_state, chan)
+ let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan);
+ if let Some(monitor_update) = monitor_update_opt {
+ let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
+ let update_id = monitor_update.update_id;
+ handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
+ peer_state, per_peer_state, chan)
+ } else { Ok(()) }
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
}
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan) => {
let funding_txo = chan.get().get_funding_txo();
- let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
- let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
- let update_id = monitor_update.update_id;
- let res = handle_new_monitor_update!(self, update_res, update_id,
- peer_state_lock, peer_state, per_peer_state, chan);
+ let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
+ let res = if let Some(monitor_update) = monitor_update_opt {
+ let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
+ let update_id = monitor_update.update_id;
+ handle_new_monitor_update!(self, update_res, update_id,
+ peer_state_lock, peer_state, per_peer_state, chan)
+ } else { Ok(()) };
(htlcs_to_fail, res)
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))