From ac5efa275539c5b7cd89665e6b5d9f2afaf8f945 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 15 Mar 2023 23:16:06 +0000 Subject: [PATCH] Allow holding `ChannelMonitorUpdate`s until later, completing one In the coming commits, we need to delay `ChannelMonitorUpdate`s until future actions (specifically `Event` handling). However, because we should only notify users once of a given `ChannelMonitorUpdate` and they must be provided in-order, we need to track which ones have or have not been given to users and, once updating resumes, fly the ones that haven't already made it to users. To do this we simply add a `bool` in the `ChannelMonitorUpdate` set stored in the `Channel` which indicates if an update flew and decline to provide new updates back to the `ChannelManager` if any updates have their flown bit unset. Further, because we'll now by releasing `ChannelMonitorUpdate`s which were already stored in the pending list, we now need to support getting a `Completed` result for a monitor which isn't the only pending monitor (or even out of order), thus we also rewrite the way monitor updates are marked completed. --- lightning/src/ln/chanmon_update_fail_tests.rs | 2 +- lightning/src/ln/channel.rs | 157 +++++++++++++----- lightning/src/ln/channelmanager.rs | 31 ++-- lightning/src/ln/functional_tests.rs | 4 +- 4 files changed, 132 insertions(+), 62 deletions(-) diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 3f210cb46..d495910ee 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -146,7 +146,7 @@ fn test_monitor_and_persister_update_fail() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2); - if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Check that even though the persister is returning a InProgress, // because the update is bogus, ultimately the error that's returned // should be a PermanentFailure. diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 8b5e89d94..de26fa462 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -479,6 +479,16 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4; /// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5; +struct PendingChannelMonitorUpdate { + update: ChannelMonitorUpdate, + /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an + /// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is + /// blocked on some external event and the [`ChannelManager`] will update us when we're ready. + /// + /// [`ChannelManager`]: super::channelmanager::ChannelManager + blocked: bool, +} + // TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking // has been completed, and then turn into a Channel to get compiler-time enforcement of things like // calling channel_id() before we're set up or things like get_outbound_funding_signed on an @@ -744,7 +754,7 @@ pub(super) struct Channel { /// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence /// completes we still need to be able to complete the persistence. Thus, we have to keep a /// copy of the [`ChannelMonitorUpdate`] here until it is complete. - pending_monitor_updates: Vec, + pending_monitor_updates: Vec, } #[cfg(any(test, fuzzing))] @@ -1979,28 +1989,52 @@ impl Channel { } pub fn get_update_fulfill_htlc_and_commit(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger { + let release_cs_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked); match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) { - UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => { - let mut additional_update = self.build_commitment_no_status_check(logger); - // build_commitment_no_status_check may bump latest_monitor_id but we want them to be - // strictly increasing by one, so decrement it here. - self.latest_monitor_update_id = monitor_update.update_id; - monitor_update.updates.append(&mut additional_update.updates); - self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); + UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => { + // Even if we aren't supposed to let new monitor updates with commitment state + // updates run, we still need to push the preimage ChannelMonitorUpdateStep no + // matter what. Sadly, to push a new monitor update which flies before others + // already queued, we have to insert it into the pending queue and update the + // update_ids of all the following monitors. + let unblocked_update_pos = if release_cs_monitor && msg.is_some() { + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them + // to be strictly increasing by one, so decrement it here. + self.latest_monitor_update_id = monitor_update.update_id; + monitor_update.updates.append(&mut additional_update.updates); + self.pending_monitor_updates.push(PendingChannelMonitorUpdate { + update: monitor_update, blocked: false, + }); + self.pending_monitor_updates.len() - 1 + } else { + let insert_pos = self.pending_monitor_updates.iter().position(|upd| upd.blocked) + .unwrap_or(self.pending_monitor_updates.len()); + let new_mon_id = self.pending_monitor_updates.get(insert_pos) + .map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id); + monitor_update.update_id = new_mon_id; + self.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate { + update: monitor_update, blocked: false, + }); + for held_update in self.pending_monitor_updates.iter_mut().skip(insert_pos + 1) { + held_update.update.update_id += 1; + } + if msg.is_some() { + debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set"); + let update = self.build_commitment_no_status_check(logger); + self.pending_monitor_updates.push(PendingChannelMonitorUpdate { + update, blocked: true, + }); + } + insert_pos + }; + self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new()); UpdateFulfillCommitFetch::NewClaim { - monitor_update: self.pending_monitor_updates.last().unwrap(), + monitor_update: &self.pending_monitor_updates.get(unblocked_update_pos) + .expect("We just pushed the monitor update").update, htlc_value_msat, } }, - UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => { - self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - UpdateFulfillCommitFetch::NewClaim { - monitor_update: self.pending_monitor_updates.last().unwrap(), - htlc_value_msat, - } - } UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {}, } } @@ -3068,7 +3102,7 @@ impl Channel { Ok(()) } - pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError> + pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result, ChannelError> where L::Target: Logger { if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { @@ -3268,8 +3302,7 @@ impl Channel { } log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id)); - self.pending_monitor_updates.push(monitor_update); - return Ok(self.pending_monitor_updates.last().unwrap()); + return Ok(self.push_ret_blockable_mon_update(monitor_update)); } let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { @@ -3286,9 +3319,8 @@ impl Channel { log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.", log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" }); - self.pending_monitor_updates.push(monitor_update); self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new()); - return Ok(self.pending_monitor_updates.last().unwrap()); + return Ok(self.push_ret_blockable_mon_update(monitor_update)); } /// Public version of the below, checking relevant preconditions first. @@ -3403,8 +3435,7 @@ impl Channel { update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len()); self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - (Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail) + (self.push_ret_blockable_mon_update(monitor_update), htlcs_to_fail) } else { (None, Vec::new()) } @@ -3415,7 +3446,7 @@ impl Channel { /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail, /// generating an appropriate error *after* the channel state has been updated based on the /// revoke_and_ack message. - pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError> + pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError> where L::Target: Logger, { if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { @@ -3612,21 +3643,19 @@ impl Channel { self.monitor_pending_failures.append(&mut revoked_htlcs); self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs); log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id())); - self.pending_monitor_updates.push(monitor_update); - return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap())); + return Ok((Vec::new(), self.push_ret_blockable_mon_update(monitor_update))); } match self.free_holding_cell_htlcs(logger) { (Some(_), htlcs_to_fail) => { - let mut additional_update = self.pending_monitor_updates.pop().unwrap(); + let mut additional_update = self.pending_monitor_updates.pop().unwrap().update; // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); - self.pending_monitor_updates.push(monitor_update); - Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) + Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update))) }, (None, htlcs_to_fail) => { if require_commitment { @@ -3640,13 +3669,11 @@ impl Channel { log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.", log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len()); self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); - self.pending_monitor_updates.push(monitor_update); - Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) + Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update))) } else { log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id())); self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); - self.pending_monitor_updates.push(monitor_update); - Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) + Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update))) } } } @@ -3835,7 +3862,12 @@ impl Channel { { assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32); self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32); - self.pending_monitor_updates.clear(); + let mut found_blocked = false; + self.pending_monitor_updates.retain(|upd| { + if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); } + if upd.blocked { found_blocked = true; } + upd.blocked + }); // If we're past (or at) the FundingSent stage on an outbound channel, try to // (re-)broadcast the funding transaction as we may have declined to broadcast it when we @@ -4378,8 +4410,9 @@ impl Channel { }], }; self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - Some(self.pending_monitor_updates.last().unwrap()) + if self.push_blockable_mon_update(monitor_update) { + self.pending_monitor_updates.last().map(|upd| &upd.update) + } else { None } } else { None }; let shutdown = if send_shutdown { Some(msgs::Shutdown { @@ -4951,8 +4984,44 @@ impl Channel { (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 } - pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> { - self.pending_monitor_updates.first() + /// Returns the next blocked monitor update, if one exists, and a bool which indicates a + /// further blocked monitor update exists after the next. + pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> { + for i in 0..self.pending_monitor_updates.len() { + if self.pending_monitor_updates[i].blocked { + self.pending_monitor_updates[i].blocked = false; + return Some((&self.pending_monitor_updates[i].update, + self.pending_monitor_updates.len() > i + 1)); + } + } + None + } + + /// Pushes a new monitor update into our monitor update queue, returning whether it should be + /// immediately given to the user for persisting or if it should be held as blocked. + fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool { + let release_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked); + self.pending_monitor_updates.push(PendingChannelMonitorUpdate { + update, blocked: !release_monitor + }); + release_monitor + } + + /// Pushes a new monitor update into our monitor update queue, returning a reference to it if + /// it should be immediately given to the user for persisting or `None` if it should be held as + /// blocked. + fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) + -> Option<&ChannelMonitorUpdate> { + let release_monitor = self.push_blockable_mon_update(update); + if release_monitor { self.pending_monitor_updates.last().map(|upd| &upd.update) } else { None } + } + + pub fn no_monitor_updates_pending(&self) -> bool { + self.pending_monitor_updates.is_empty() + } + + pub fn complete_one_mon_update(&mut self, update_id: u64) { + self.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id); } /// Returns true if funding_created was sent/received. @@ -6000,8 +6069,7 @@ impl Channel { Some(_) => { let monitor_update = self.build_commitment_no_status_check(logger); self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - Ok(Some(self.pending_monitor_updates.last().unwrap())) + Ok(self.push_ret_blockable_mon_update(monitor_update)) }, None => Ok(None) } @@ -6090,8 +6158,9 @@ impl Channel { }], }; self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - Some(self.pending_monitor_updates.last().unwrap()) + if self.push_blockable_mon_update(monitor_update) { + self.pending_monitor_updates.last().map(|upd| &upd.update) + } else { None } } else { None }; let shutdown = msgs::Shutdown { channel_id: self.channel_id, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 498813de7..422114089 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1680,11 +1680,8 @@ macro_rules! handle_new_monitor_update { res }, ChannelMonitorUpdateStatus::Completed => { - if ($update_id == 0 || $chan.get_next_monitor_update() - .expect("We can't be processing a monitor update if it isn't queued") - .update_id == $update_id) && - $chan.get_latest_monitor_update_id() == $update_id - { + $chan.complete_one_mon_update($update_id); + if $chan.no_monitor_updates_pending() { handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); } Ok(()) @@ -5131,11 +5128,13 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().get_funding_txo(); - let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan) + let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); + if let Some(monitor_update) = monitor_update_opt { + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, per_peer_state, chan) + } else { Ok(()) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5250,11 +5249,13 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().get_funding_txo(); - let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - let res = handle_new_monitor_update!(self, update_res, update_id, - peer_state_lock, peer_state, per_peer_state, chan); + let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let res = if let Some(monitor_update) = monitor_update_opt { + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, + peer_state_lock, peer_state, per_peer_state, chan) + } else { Ok(()) }; (htlcs_to_fail, res) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 2b8c8b8e3..8d08be057 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -8449,7 +8449,7 @@ fn test_update_err_monitor_lockdown() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2); - if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { assert_eq!(watchtower.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure); assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed); } else { assert!(false); } @@ -8549,7 +8549,7 @@ fn test_concurrent_monitor_claim() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2); - if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Watchtower Alice should already have seen the block and reject the update assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure); assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed); -- 2.39.5