}
/// The return type of get_update_fulfill_htlc_and_commit.
-pub enum UpdateFulfillCommitFetch {
+pub enum UpdateFulfillCommitFetch<'a> {
/// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
/// it in the holding cell, or re-generated the update_fulfill message after the same claim was
/// previously placed in the holding cell (and has since been removed).
NewClaim {
/// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
- monitor_update: ChannelMonitorUpdate,
+ monitor_update: &'a ChannelMonitorUpdate,
/// The value of the HTLC which was claimed, in msat.
htlc_value_msat: u64,
- /// The update_fulfill message and commitment_signed message (if the claim was not placed
- /// in the holding cell).
- msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>,
},
/// Indicates the HTLC fulfill is duplicative and already existed either in the holding cell
/// or has been forgotten (presumably previously claimed).
DuplicateClaim {},
}
-/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC
-/// state.
-pub(super) struct RAAUpdates {
- pub commitment_update: Option<msgs::CommitmentUpdate>,
- pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
- pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
- pub finalized_claimed_htlcs: Vec<HTLCSource>,
- pub monitor_update: ChannelMonitorUpdate,
- pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>,
-}
-
/// The return value of `monitor_updating_restored`
pub(super) struct MonitorRestoreUpdates {
pub raa: Option<msgs::RevokeAndACK>,
monitor_pending_channel_ready: bool,
monitor_pending_revoke_and_ack: bool,
monitor_pending_commitment_signed: bool,
+
+ // TODO: If a channel is drop'd, we don't know whether the `ChannelMonitor` is ultimately
+ // responsible for some of the HTLCs here or not - we don't know whether the update in question
+ // completed or not. We currently ignore these fields entirely when force-closing a channel,
+ // but need to handle this somehow or we run the risk of losing HTLCs!
monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>,
monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
monitor_pending_finalized_fulfills: Vec<HTLCSource>,
/// The unique identifier used to re-derive the private key material for the channel through
/// [`SignerProvider::derive_channel_signer`].
channel_keys_id: [u8; 32],
+
+ /// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately.
+ /// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
+ /// completes we still need to be able to complete the persistence. Thus, we have to keep a
+ /// copy of the [`ChannelMonitorUpdate`] here until it is complete.
+ pending_monitor_updates: Vec<ChannelMonitorUpdate>,
}
#[cfg(any(test, fuzzing))]
channel_type,
channel_keys_id,
+
+ pending_monitor_updates: Vec::new(),
})
}
channel_type,
channel_keys_id,
+
+ pending_monitor_updates: Vec::new(),
};
Ok(chan)
}
}
- pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result<UpdateFulfillCommitFetch, (ChannelError, ChannelMonitorUpdate)> where L::Target: Logger {
+ pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
- UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => {
- let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) {
- Err(e) => return Err((e, monitor_update)),
- Ok(res) => res
- };
- // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+ UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
+ let mut additional_update = self.build_commitment_no_status_check(logger);
+ // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
- Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) })
+ self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
+ self.pending_monitor_updates.push(monitor_update);
+ UpdateFulfillCommitFetch::NewClaim {
+ monitor_update: self.pending_monitor_updates.last().unwrap(),
+ htlc_value_msat,
+ }
},
- UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } =>
- Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }),
- UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}),
+ UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
+ self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
+ self.pending_monitor_updates.push(monitor_update);
+ UpdateFulfillCommitFetch::NewClaim {
+ monitor_update: self.pending_monitor_updates.last().unwrap(),
+ htlc_value_msat,
+ }
+ }
+ UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
}
}
pub fn funding_created<SP: Deref, L: Deref>(
&mut self, msg: &msgs::FundingCreated, best_block: BestBlock, signer_provider: &SP, logger: &L
- ) -> Result<(msgs::FundingSigned, ChannelMonitor<<SP::Target as SignerProvider>::Signer>, Option<msgs::ChannelReady>), ChannelError>
+ ) -> Result<(msgs::FundingSigned, ChannelMonitor<Signer>), ChannelError>
where
- SP::Target: SignerProvider,
+ SP::Target: SignerProvider<Signer = Signer>,
L::Target: Logger
{
if self.is_outbound() {
log_info!(logger, "Generated funding_signed for peer for channel {}", log_bytes!(self.channel_id()));
+ let need_channel_ready = self.check_get_channel_ready(0).is_some();
+ self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new());
+
Ok((msgs::FundingSigned {
channel_id: self.channel_id,
signature
- }, channel_monitor, self.check_get_channel_ready(0)))
+ }, channel_monitor))
}
/// Handles a funding_signed message from the remote end.
/// If this call is successful, broadcast the funding transaction (and not before!)
pub fn funding_signed<SP: Deref, L: Deref>(
&mut self, msg: &msgs::FundingSigned, best_block: BestBlock, signer_provider: &SP, logger: &L
- ) -> Result<(ChannelMonitor<<SP::Target as SignerProvider>::Signer>, Transaction, Option<msgs::ChannelReady>), ChannelError>
+ ) -> Result<ChannelMonitor<Signer>, ChannelError>
where
- SP::Target: SignerProvider,
+ SP::Target: SignerProvider<Signer = Signer>,
L::Target: Logger
{
if !self.is_outbound() {
log_info!(logger, "Received funding_signed from peer for channel {}", log_bytes!(self.channel_id()));
- Ok((channel_monitor, self.funding_transaction.as_ref().cloned().unwrap(), self.check_get_channel_ready(0)))
+ let need_channel_ready = self.check_get_channel_ready(0).is_some();
+ self.monitor_updating_paused(false, false, need_channel_ready, Vec::new(), Vec::new(), Vec::new());
+ Ok(channel_monitor)
}
/// Handles a channel_ready message from our peer. If we've already sent our channel_ready
Ok(())
}
- pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>, ChannelMonitorUpdate), (Option<ChannelMonitorUpdate>, ChannelError)>
+ pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
where L::Target: Logger
{
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
- return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned())));
+ return Err(ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned()));
}
if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
- return Err((None, ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned())));
+ return Err(ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned()));
}
if self.channel_state & BOTH_SIDES_SHUTDOWN_MASK == BOTH_SIDES_SHUTDOWN_MASK && self.last_sent_closing_fee.is_some() {
- return Err((None, ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned())));
+ return Err(ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned()));
}
let funding_script = self.get_funding_redeemscript();
log_bytes!(self.counterparty_funding_pubkey().serialize()), encode::serialize_hex(&bitcoin_tx.transaction),
log_bytes!(sighash[..]), encode::serialize_hex(&funding_script), log_bytes!(self.channel_id()));
if let Err(_) = self.secp_ctx.verify_ecdsa(&sighash, &msg.signature, &self.counterparty_funding_pubkey()) {
- return Err((None, ChannelError::Close("Invalid commitment tx signature from peer".to_owned())));
+ return Err(ChannelError::Close("Invalid commitment tx signature from peer".to_owned()));
}
bitcoin_tx.txid
};
debug_assert!(!self.is_outbound());
let counterparty_reserve_we_require_msat = self.holder_selected_channel_reserve_satoshis * 1000;
if commitment_stats.remote_balance_msat < commitment_stats.total_fee_sat * 1000 + counterparty_reserve_we_require_msat {
- return Err((None, ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned())));
+ return Err(ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned()));
}
}
#[cfg(any(test, fuzzing))]
}
if msg.htlc_signatures.len() != commitment_stats.num_nondust_htlcs {
- return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs))));
+ return Err(ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs)));
}
// TODO: Sadly, we pass HTLCs twice to ChannelMonitor: once via the HolderCommitmentTransaction and once via the update
log_bytes!(msg.htlc_signatures[idx].serialize_compact()[..]), log_bytes!(keys.countersignatory_htlc_key.serialize()),
encode::serialize_hex(&htlc_tx), log_bytes!(htlc_sighash[..]), encode::serialize_hex(&htlc_redeemscript), log_bytes!(self.channel_id()));
if let Err(_) = self.secp_ctx.verify_ecdsa(&htlc_sighash, &msg.htlc_signatures[idx], &keys.countersignatory_htlc_key) {
- return Err((None, ChannelError::Close("Invalid HTLC tx signature from peer".to_owned())));
+ return Err(ChannelError::Close("Invalid HTLC tx signature from peer".to_owned()));
}
htlcs_and_sigs.push((htlc, Some(msg.htlc_signatures[idx]), source));
} else {
self.counterparty_funding_pubkey()
);
- let next_per_commitment_point = self.holder_signer.get_per_commitment_point(self.cur_holder_commitment_transaction_number - 1, &self.secp_ctx);
self.holder_signer.validate_holder_commitment(&holder_commitment_tx, commitment_stats.preimages)
- .map_err(|_| (None, ChannelError::Close("Failed to validate our commitment".to_owned())))?;
- let per_commitment_secret = self.holder_signer.release_commitment_secret(self.cur_holder_commitment_transaction_number + 1);
+ .map_err(|_| ChannelError::Close("Failed to validate our commitment".to_owned()))?;
// Update state now that we've passed all the can-fail calls...
let mut need_commitment = false;
self.cur_holder_commitment_transaction_number -= 1;
// Note that if we need_commitment & !AwaitingRemoteRevoke we'll call
- // send_commitment_no_status_check() next which will reset this to RAAFirst.
+ // build_commitment_no_status_check() next which will reset this to RAAFirst.
self.resend_order = RAACommitmentOrder::CommitmentFirst;
if (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 {
// the corresponding HTLC status updates so that get_last_commitment_update
// includes the right HTLCs.
self.monitor_pending_commitment_signed = true;
- let (_, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
- // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+ let mut additional_update = self.build_commitment_no_status_check(logger);
+ // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
}
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
log_bytes!(self.channel_id));
- return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned())));
+ self.pending_monitor_updates.push(monitor_update);
+ return Ok(self.pending_monitor_updates.last().unwrap());
}
- let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
+ let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
// If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok -
// we'll send one right away when we get the revoke_and_ack when we
// free_holding_cell_htlcs().
- let (msg, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
- // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+ let mut additional_update = self.build_commitment_no_status_check(logger);
+ // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
- Some(msg)
- } else { None };
+ true
+ } else { false };
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
- log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" });
-
- Ok((msgs::RevokeAndACK {
- channel_id: self.channel_id,
- per_commitment_secret,
- next_per_commitment_point,
- }, commitment_signed, monitor_update))
+ log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
+ self.pending_monitor_updates.push(monitor_update);
+ self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
+ return Ok(self.pending_monitor_updates.last().unwrap());
}
/// Public version of the below, checking relevant preconditions first.
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
/// returns `(None, Vec::new())`.
- pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
+ pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
if self.channel_state >= ChannelState::ChannelReady as u32 &&
(self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
self.free_holding_cell_htlcs(logger)
- } else { Ok((None, Vec::new())) }
+ } else { (None, Vec::new()) }
}
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
/// for our counterparty.
- fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
+ fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(),
}
}
if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
- return Ok((None, htlcs_to_fail));
+ return (None, htlcs_to_fail);
}
let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() {
self.send_update_fee(feerate, false, logger)
None
};
- let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
- // send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
+ let mut additional_update = self.build_commitment_no_status_check(logger);
+ // build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
// but we want them to be strictly increasing by one, so reset it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" },
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
- Ok((Some((msgs::CommitmentUpdate {
- update_add_htlcs,
- update_fulfill_htlcs,
- update_fail_htlcs,
- update_fail_malformed_htlcs: Vec::new(),
- update_fee,
- commitment_signed,
- }, monitor_update)), htlcs_to_fail))
+ self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
+ self.pending_monitor_updates.push(monitor_update);
+ (Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
} else {
- Ok((None, Vec::new()))
+ (None, Vec::new())
}
}
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
/// generating an appropriate error *after* the channel state has been updated based on the
/// revoke_and_ack message.
- pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<RAAUpdates, ChannelError>
+ pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
where L::Target: Logger,
{
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
// When the monitor updating is restored we'll call get_last_commitment_update(),
// which does not update state, but we're definitely now awaiting a remote revoke
// before we can step forward any more, so set it here.
- let (_, mut additional_update) = self.send_commitment_no_status_check(logger)?;
- // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+ let mut additional_update = self.build_commitment_no_status_check(logger);
+ // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
self.monitor_pending_failures.append(&mut revoked_htlcs);
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
- return Ok(RAAUpdates {
- commitment_update: None, finalized_claimed_htlcs: Vec::new(),
- accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(),
- monitor_update,
- holding_cell_failed_htlcs: Vec::new()
- });
+ self.pending_monitor_updates.push(monitor_update);
+ return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
}
- match self.free_holding_cell_htlcs(logger)? {
- (Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
- commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
- for fail_msg in update_fail_htlcs.drain(..) {
- commitment_update.update_fail_htlcs.push(fail_msg);
- }
- commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len());
- for fail_msg in update_fail_malformed_htlcs.drain(..) {
- commitment_update.update_fail_malformed_htlcs.push(fail_msg);
- }
-
+ match self.free_holding_cell_htlcs(logger) {
+ (Some(_), htlcs_to_fail) => {
+ let mut additional_update = self.pending_monitor_updates.pop().unwrap();
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
- Ok(RAAUpdates {
- commitment_update: Some(commitment_update),
- finalized_claimed_htlcs,
- accepted_htlcs: to_forward_infos,
- failed_htlcs: revoked_htlcs,
- monitor_update,
- holding_cell_failed_htlcs: htlcs_to_fail
- })
+ self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
+ self.pending_monitor_updates.push(monitor_update);
+ Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
},
(None, htlcs_to_fail) => {
if require_commitment {
- let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
+ let mut additional_update = self.build_commitment_no_status_check(logger);
- // send_commitment_no_status_check may bump latest_monitor_id but we want them to be
+ // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
- Ok(RAAUpdates {
- commitment_update: Some(msgs::CommitmentUpdate {
- update_add_htlcs: Vec::new(),
- update_fulfill_htlcs: Vec::new(),
- update_fail_htlcs,
- update_fail_malformed_htlcs,
- update_fee: None,
- commitment_signed
- }),
- finalized_claimed_htlcs,
- accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
- monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
- })
+ self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
+ self.pending_monitor_updates.push(monitor_update);
+ Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
} else {
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
- Ok(RAAUpdates {
- commitment_update: None,
- finalized_claimed_htlcs,
- accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
- monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
- })
+ self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
+ self.pending_monitor_updates.push(monitor_update);
+ Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
}
}
}
}
/// Indicates that a ChannelMonitor update is in progress and has not yet been fully persisted.
- /// This must be called immediately after the [`chain::Watch`] call which returned
- /// [`ChannelMonitorUpdateStatus::InProgress`].
+ /// This must be called before we return the [`ChannelMonitorUpdate`] back to the
+ /// [`ChannelManager`], which will call [`Self::monitor_updating_restored`] once the monitor
+ /// update completes (potentially immediately).
/// The messages which were generated with the monitor update must *not* have been sent to the
/// remote end, and must instead have been dropped. They will be regenerated when
/// [`Self::monitor_updating_restored`] is called.
///
+ /// [`ChannelManager`]: super::channelmanager::ChannelManager
/// [`chain::Watch`]: crate::chain::Watch
/// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress
- pub fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool,
+ fn monitor_updating_paused(&mut self, resend_raa: bool, resend_commitment: bool,
resend_channel_ready: bool, mut pending_forwards: Vec<(PendingHTLCInfo, u64)>,
mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
mut pending_finalized_claimed_htlcs: Vec<HTLCSource>
{
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
+ self.pending_monitor_updates.clear();
// If we're past (or at) the FundingSent stage on an outbound channel, try to
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
pub fn shutdown<SP: Deref>(
&mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown
- ) -> Result<(Option<msgs::Shutdown>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
+ ) -> Result<(Option<msgs::Shutdown>, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
where SP::Target: SignerProvider
{
if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
let monitor_update = if update_shutdown_script {
self.latest_monitor_update_id += 1;
- Some(ChannelMonitorUpdate {
+ let monitor_update = ChannelMonitorUpdate {
update_id: self.latest_monitor_update_id,
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
scriptpubkey: self.get_closing_scriptpubkey(),
}],
- })
+ };
+ self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
+ self.pending_monitor_updates.push(monitor_update);
+ Some(self.pending_monitor_updates.last().unwrap())
} else { None };
let shutdown = if send_shutdown {
Some(msgs::Shutdown {
(self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
}
+ pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> {
+ self.pending_monitor_updates.first()
+ }
+
/// Returns true if funding_created was sent/received.
pub fn is_funding_initiated(&self) -> bool {
self.channel_state >= ChannelState::FundingSent as u32
Ok(Some(res))
}
- /// Only fails in case of bad keys
- fn send_commitment_no_status_check<L: Deref>(&mut self, logger: &L) -> Result<(msgs::CommitmentSigned, ChannelMonitorUpdate), ChannelError> where L::Target: Logger {
+ fn build_commitment_no_status_check<L: Deref>(&mut self, logger: &L) -> ChannelMonitorUpdate where L::Target: Logger {
log_trace!(logger, "Updating HTLC state for a newly-sent commitment_signed...");
// We can upgrade the status of some HTLCs that are waiting on a commitment, even if we
// fail to generate this, we still are at least at a position where upgrading their status
}
self.resend_order = RAACommitmentOrder::RevokeAndACKFirst;
- let (res, counterparty_commitment_txid, htlcs) = match self.send_commitment_no_state_update(logger) {
- Ok((res, (counterparty_commitment_tx, mut htlcs))) => {
- // Update state now that we've passed all the can-fail calls...
- let htlcs_no_ref: Vec<(HTLCOutputInCommitment, Option<Box<HTLCSource>>)> =
- htlcs.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect();
- (res, counterparty_commitment_tx, htlcs_no_ref)
- },
- Err(e) => return Err(e),
- };
+ let (counterparty_commitment_txid, mut htlcs_ref) = self.build_commitment_no_state_update(logger);
+ let htlcs: Vec<(HTLCOutputInCommitment, Option<Box<HTLCSource>>)> =
+ htlcs_ref.drain(..).map(|(htlc, htlc_source)| (htlc, htlc_source.map(|source_ref| Box::new(source_ref.clone())))).collect();
if self.announcement_sigs_state == AnnouncementSigsState::MessageSent {
self.announcement_sigs_state = AnnouncementSigsState::Committed;
}]
};
self.channel_state |= ChannelState::AwaitingRemoteRevoke as u32;
- Ok((res, monitor_update))
+ monitor_update
}
- /// Only fails in case of bad keys. Used for channel_reestablish commitment_signed generation
- /// when we shouldn't change HTLC/channel state.
- fn send_commitment_no_state_update<L: Deref>(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger {
+ fn build_commitment_no_state_update<L: Deref>(&self, logger: &L) -> (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>) where L::Target: Logger {
let counterparty_keys = self.build_remote_transaction_keys();
let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger);
let counterparty_commitment_txid = commitment_stats.tx.trust().txid();
- let (signature, htlc_signatures);
#[cfg(any(test, fuzzing))]
{
}
}
+ (counterparty_commitment_txid, commitment_stats.htlcs_included)
+ }
+
+ /// Only fails in case of signer rejection. Used for channel_reestablish commitment_signed
+ /// generation when we shouldn't change HTLC/channel state.
+ fn send_commitment_no_state_update<L: Deref>(&self, logger: &L) -> Result<(msgs::CommitmentSigned, (Txid, Vec<(HTLCOutputInCommitment, Option<&HTLCSource>)>)), ChannelError> where L::Target: Logger {
+ // Get the fee tests from `build_commitment_no_state_update`
+ #[cfg(any(test, fuzzing))]
+ self.build_commitment_no_state_update(logger);
+
+ let counterparty_keys = self.build_remote_transaction_keys();
+ let commitment_stats = self.build_commitment_transaction(self.cur_counterparty_commitment_transaction_number, &counterparty_keys, false, true, logger);
+ let counterparty_commitment_txid = commitment_stats.tx.trust().txid();
+ let (signature, htlc_signatures);
+
{
let mut htlcs = Vec::with_capacity(commitment_stats.htlcs_included.len());
for &(ref htlc, _) in commitment_stats.htlcs_included.iter() {
}, (counterparty_commitment_txid, commitment_stats.htlcs_included)))
}
- /// Adds a pending outbound HTLC to this channel, and creates a signed commitment transaction
- /// to send to the remote peer in one go.
+ /// Adds a pending outbound HTLC to this channel, and builds a new remote commitment
+ /// transaction and generates the corresponding [`ChannelMonitorUpdate`] in one go.
///
/// Shorthand for calling [`Self::send_htlc`] followed by a commitment update, see docs on
- /// [`Self::send_htlc`] and [`Self::send_commitment_no_state_update`] for more info.
- pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
- match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? {
- Some(update_add_htlc) => {
- let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
- Ok(Some((update_add_htlc, commitment_signed, monitor_update)))
+ /// [`Self::send_htlc`] and [`Self::build_commitment_no_state_update`] for more info.
+ pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
+ let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger);
+ if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } }
+ match send_res? {
+ Some(_) => {
+ let monitor_update = self.build_commitment_no_status_check(logger);
+ self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
+ self.pending_monitor_updates.push(monitor_update);
+ Ok(Some(self.pending_monitor_updates.last().unwrap()))
},
None => Ok(None)
}
/// Begins the shutdown process, getting a message for the remote peer and returning all
/// holding cell HTLCs for payment failure.
- pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures, target_feerate_sats_per_kw: Option<u32>)
- -> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
+ ///
+ /// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no
+ /// [`ChannelMonitorUpdate`] will be returned).
+ pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures,
+ target_feerate_sats_per_kw: Option<u32>)
+ -> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
where SP::Target: SignerProvider {
for htlc in self.pending_outbound_htlcs.iter() {
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
return Err(APIError::ChannelUnavailable{err: "Cannot begin shutdown while peer is disconnected or we're waiting on a monitor update, maybe force-close instead?".to_owned()});
}
+ // If we haven't funded the channel yet, we don't need to bother ensuring the shutdown
+ // script is set, we just force-close and call it a day.
+ let mut chan_closed = false;
+ if self.channel_state < ChannelState::FundingSent as u32 {
+ chan_closed = true;
+ }
+
let update_shutdown_script = match self.shutdown_scriptpubkey {
Some(_) => false,
- None => {
+ None if !chan_closed => {
let shutdown_scriptpubkey = signer_provider.get_shutdown_scriptpubkey();
if !shutdown_scriptpubkey.is_compatible(their_features) {
return Err(APIError::IncompatibleShutdownScript { script: shutdown_scriptpubkey.clone() });
self.shutdown_scriptpubkey = Some(shutdown_scriptpubkey);
true
},
+ None => false,
};
// From here on out, we may not fail!
let monitor_update = if update_shutdown_script {
self.latest_monitor_update_id += 1;
- Some(ChannelMonitorUpdate {
+ let monitor_update = ChannelMonitorUpdate {
update_id: self.latest_monitor_update_id,
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
scriptpubkey: self.get_closing_scriptpubkey(),
}],
- })
+ };
+ self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
+ self.pending_monitor_updates.push(monitor_update);
+ Some(self.pending_monitor_updates.last().unwrap())
} else { None };
let shutdown = msgs::Shutdown {
channel_id: self.channel_id,
}
});
+ debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
+ "we can't both complete shutdown and return a monitor update");
+
Ok((shutdown, monitor_update, dropped_outbound_htlcs))
}
channel_type: channel_type.unwrap(),
channel_keys_id,
+
+ pending_monitor_updates: Vec::new(),
})
}
}
}]};
let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
let funding_created_msg = node_a_chan.get_outbound_funding_created(tx.clone(), funding_outpoint, &&logger).unwrap();
- let (funding_signed_msg, _, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap();
+ let (funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).unwrap();
// Node B --> Node A: funding signed
let _ = node_a_chan.funding_signed(&funding_signed_msg, best_block, &&keys_provider, &&logger);
use crate::util::logger::{Level, Logger};
use crate::util::errors::APIError;
+use alloc::collections::BTreeMap;
+
use crate::io;
use crate::prelude::*;
use core::{cmp, mem};
}
}
#[inline]
- fn ignore_no_close(err: String) -> Self {
- Self {
- err: LightningError {
- err,
- action: msgs::ErrorAction::IgnoreError,
- },
- chan_id: None,
- shutdown_finish: None,
- }
- }
- #[inline]
fn from_no_close(err: msgs::LightningError) -> Self {
Self { err, chan_id: None, shutdown_finish: None }
}
ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)),
}
+#[derive(Debug)]
pub(crate) enum MonitorUpdateCompletionAction {
/// Indicates that a payment ultimately destined for us was claimed and we should emit an
/// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
EmitEvent { event: events::Event },
}
+impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
+ (0, PaymentClaimed) => { (0, payment_hash, required) },
+ (2, EmitEvent) => { (0, event, ignorable) },
+);
+
/// State we hold per-peer.
pub(super) struct PeerState<Signer: ChannelSigner> {
/// `temporary_channel_id` or `channel_id` -> `channel`.
/// Messages to send to the peer - pushed to in the same lock that they are generated in (except
/// for broadcast messages, where ordering isn't as strict).
pub(super) pending_msg_events: Vec<MessageSendEvent>,
+ /// Map from a specific channel to some action(s) that should be taken when all pending
+ /// [`ChannelMonitorUpdate`]s for the channel complete updating.
+ ///
+ /// Note that because we generally only have one entry here a HashMap is pretty overkill. A
+ /// BTreeMap currently stores more than ten elements per leaf node, so even up to a few
+ /// channels with a peer this will just be one allocation and will amount to a linear list of
+ /// channels to walk, avoiding the whole hashing rigmarole.
+ ///
+ /// Note that the channel may no longer exist. For example, if a channel was closed but we
+ /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update
+ /// for a missing channel. While a malicious peer could construct a second channel with the
+ /// same `temporary_channel_id` (or final `channel_id` in the case of 0conf channels or prior
+ /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
+ /// duplicates do not occur, so such channels should fail without a monitor update completing.
+ monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
/// The peer is currently connected (i.e. we've seen a
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
/// [`ChannelMessageHandler::peer_disconnected`].
if require_disconnected && self.is_connected {
return false
}
- self.channel_by_id.len() == 0
+ self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty()
}
}
}
}
-macro_rules! handle_monitor_update_res {
- ($self: ident, $err: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => {
- match $err {
- ChannelMonitorUpdateStatus::PermanentFailure => {
- log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan_id[..]));
- update_maps_on_chan_removal!($self, $chan);
- // TODO: $failed_fails is dropped here, which will cause other channels to hit the
- // chain in a confused state! We need to move them into the ChannelMonitor which
- // will be responsible for failing backwards once things confirm on-chain.
- // It's ok that we drop $failed_forwards here - at this point we'd rather they
- // broadcast HTLC-Timeout and pay the associated fees to get their funds back than
- // us bother trying to claim it just to forward on to another peer. If we're
- // splitting hairs we'd prefer to claim payments that were to us, but we haven't
- // given up the preimage yet, so might as well just wait until the payment is
- // retried, avoiding the on-chain fees.
- let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.get_user_id(),
- $chan.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok() ));
- (res, true)
- },
- ChannelMonitorUpdateStatus::InProgress => {
- log_info!($self.logger, "Disabling channel {} due to monitor update in progress. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations",
- log_bytes!($chan_id[..]),
- if $resend_commitment && $resend_raa {
- match $action_type {
- RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" },
- RAACommitmentOrder::RevokeAndACKFirst => { "RAA then commitment" },
- }
- } else if $resend_commitment { "commitment" }
- else if $resend_raa { "RAA" }
- else { "nothing" },
- (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(),
- (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(),
- (&$failed_finalized_fulfills as &Vec<HTLCSource>).len());
- if !$resend_commitment {
- debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa);
- }
- if !$resend_raa {
- debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
- }
- $chan.monitor_updating_paused($resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills);
- (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
- },
- ChannelMonitorUpdateStatus::Completed => {
- (Ok(()), false)
- },
- }
- };
- ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { {
- let (res, drop) = handle_monitor_update_res!($self, $err, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key());
- if drop {
- $entry.remove_entry();
- }
- res
- } };
- ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, COMMITMENT_UPDATE_ONLY) => { {
- debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst);
- handle_monitor_update_res!($self, $err, $entry, $action_type, false, true, false, Vec::new(), Vec::new(), Vec::new(), $chan_id)
- } };
- ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, NO_UPDATE) => {
- handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, false, Vec::new(), Vec::new(), Vec::new(), $chan_id)
- };
- ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_channel_ready: expr, OPTIONALLY_RESEND_FUNDING_LOCKED) => {
- handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, $resend_channel_ready, Vec::new(), Vec::new(), Vec::new())
- };
- ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
- handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, Vec::new(), Vec::new(), Vec::new())
- };
- ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
- handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, $failed_forwards, $failed_fails, Vec::new())
- };
-}
-
macro_rules! send_channel_ready {
($self: ident, $pending_msg_events: expr, $channel: expr, $channel_ready_msg: expr) => {{
$pending_msg_events.push(events::MessageSendEvent::SendChannelReady {
}
}
+macro_rules! handle_monitor_update_completion {
+ ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr) => { {
+ let mut updates = $chan.monitor_updating_restored(&$self.logger,
+ &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
+ $self.best_block.read().unwrap().height());
+ let counterparty_node_id = $chan.get_counterparty_node_id();
+ let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() {
+ // We only send a channel_update in the case where we are just now sending a
+ // channel_ready and the channel is in a usable state. We may re-send a
+ // channel_update later through the announcement_signatures process for public
+ // channels, but there's no reason not to just inform our counterparty of our fees
+ // now.
+ if let Ok(msg) = $self.get_channel_update_for_unicast($chan) {
+ Some(events::MessageSendEvent::SendChannelUpdate {
+ node_id: counterparty_node_id,
+ msg,
+ })
+ } else { None }
+ } else { None };
+
+ let update_actions = $peer_state.monitor_update_blocked_actions
+ .remove(&$chan.channel_id()).unwrap_or(Vec::new());
+
+ let htlc_forwards = $self.handle_channel_resumption(
+ &mut $peer_state.pending_msg_events, $chan, updates.raa,
+ updates.commitment_update, updates.order, updates.accepted_htlcs,
+ updates.funding_broadcastable, updates.channel_ready,
+ updates.announcement_sigs);
+ if let Some(upd) = channel_update {
+ $peer_state.pending_msg_events.push(upd);
+ }
+
+ let channel_id = $chan.channel_id();
+ core::mem::drop($peer_state_lock);
+
+ $self.handle_monitor_update_completion_actions(update_actions);
+
+ if let Some(forwards) = htlc_forwards {
+ $self.forward_htlcs(&mut [forwards][..]);
+ }
+ $self.finalize_claims(updates.finalized_claimed_htlcs);
+ for failure in updates.failed_htlcs.drain(..) {
+ let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
+ $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
+ }
+ } }
+}
+
+macro_rules! handle_new_monitor_update {
+ ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+ // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
+ // any case so that it won't deadlock.
+ debug_assert!($self.id_to_peer.try_lock().is_ok());
+ match $update_res {
+ ChannelMonitorUpdateStatus::InProgress => {
+ log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
+ log_bytes!($chan.channel_id()[..]));
+ Ok(())
+ },
+ ChannelMonitorUpdateStatus::PermanentFailure => {
+ log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure",
+ log_bytes!($chan.channel_id()[..]));
+ update_maps_on_chan_removal!($self, $chan);
+ let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown(
+ "ChannelMonitor storage failure".to_owned(), $chan.channel_id(),
+ $chan.get_user_id(), $chan.force_shutdown(false),
+ $self.get_channel_update_for_broadcast(&$chan).ok()));
+ $remove;
+ res
+ },
+ ChannelMonitorUpdateStatus::Completed => {
+ if ($update_id == 0 || $chan.get_next_monitor_update()
+ .expect("We can't be processing a monitor update if it isn't queued")
+ .update_id == $update_id) &&
+ $chan.get_latest_monitor_update_id() == $update_id
+ {
+ handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $chan);
+ }
+ Ok(())
+ },
+ }
+ } };
+ ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan_entry: expr) => {
+ handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
+ }
+}
+
impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>
where
M::Target: chain::Watch<<SP::Target as SignerProvider>::Signer>,
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.entry(channel_id.clone()) {
hash_map::Entry::Occupied(mut chan_entry) => {
- let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.signer_provider, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?;
+ let funding_txo_opt = chan_entry.get().get_funding_txo();
+ let their_features = &peer_state.latest_features;
+ let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
+ .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?;
failed_htlcs = htlcs;
- // Update the monitor with the shutdown script if necessary.
- if let Some(monitor_update) = monitor_update {
- let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
- let (result, is_permanent) =
- handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
- if is_permanent {
- remove_channel!(self, chan_entry);
- break result;
- }
- }
-
+ // We can send the `shutdown` message before updating the `ChannelMonitor`
+ // here as we don't need the monitor update to complete until we send a
+ // `shutdown_signed`, which we'll delay if we're pending a monitor update.
peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
node_id: *counterparty_node_id,
- msg: shutdown_msg
+ msg: shutdown_msg,
});
+ // Update the monitor with the shutdown script if necessary.
+ if let Some(monitor_update) = monitor_update_opt.take() {
+ let update_id = monitor_update.update_id;
+ let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
+ break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
+ }
+
if chan_entry.get().is_shutdown() {
let channel = remove_channel!(self, chan_entry);
if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) {
- match {
- if !chan.get().is_live() {
- return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!".to_owned()});
- }
- break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(
- htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
- path: path.clone(),
- session_priv: session_priv.clone(),
- first_hop_htlc_msat: htlc_msat,
- payment_id,
- payment_secret: payment_secret.clone(),
- payment_params: payment_params.clone(),
- }, onion_packet, &self.logger),
- chan)
- } {
- Some((update_add, commitment_signed, monitor_update)) => {
- let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
- let chan_id = chan.get().channel_id();
- match (update_err,
- handle_monitor_update_res!(self, update_err, chan,
- RAACommitmentOrder::CommitmentFirst, false, true))
- {
- (ChannelMonitorUpdateStatus::PermanentFailure, Err(e)) => break Err(e),
- (ChannelMonitorUpdateStatus::Completed, Ok(())) => {},
- (ChannelMonitorUpdateStatus::InProgress, Err(_)) => {
- // Note that MonitorUpdateInProgress here indicates (per function
- // docs) that we will resend the commitment update once monitor
- // updating completes. Therefore, we must return an error
- // indicating that it is unsafe to retry the payment wholesale,
- // which we do in the send_payment check for
- // MonitorUpdateInProgress, below.
- return Err(APIError::MonitorUpdateInProgress);
- },
- _ => unreachable!(),
+ if !chan.get().is_live() {
+ return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()});
+ }
+ let funding_txo = chan.get().get_funding_txo().unwrap();
+ let send_res = chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(),
+ htlc_cltv, HTLCSource::OutboundRoute {
+ path: path.clone(),
+ session_priv: session_priv.clone(),
+ first_hop_htlc_msat: htlc_msat,
+ payment_id,
+ payment_secret: payment_secret.clone(),
+ payment_params: payment_params.clone(),
+ }, onion_packet, &self.logger);
+ match break_chan_entry!(self, send_res, chan) {
+ Some(monitor_update) => {
+ let update_id = monitor_update.update_id;
+ let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update);
+ if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan) {
+ break Err(e);
+ }
+ if update_res == ChannelMonitorUpdateStatus::InProgress {
+ // Note that MonitorUpdateInProgress here indicates (per function
+ // docs) that we will resend the commitment update once monitor
+ // updating completes. Therefore, we must return an error
+ // indicating that it is unsafe to retry the payment wholesale,
+ // which we do in the send_payment check for
+ // MonitorUpdateInProgress, below.
+ return Err(APIError::MonitorUpdateInProgress);
}
-
- log_debug!(self.logger, "Sending payment along path resulted in a commitment_signed for channel {}", log_bytes!(chan_id));
- peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
- node_id: path.first().unwrap().pubkey,
- updates: msgs::CommitmentUpdate {
- update_add_htlcs: vec![update_add],
- update_fulfill_htlcs: Vec::new(),
- update_fail_htlcs: Vec::new(),
- update_fail_malformed_htlcs: Vec::new(),
- update_fee: None,
- commitment_signed,
- },
- });
},
None => { },
}
let per_peer_state = self.per_peer_state.read().unwrap();
let chan_id = prev_hop.outpoint.to_channel_id();
-
let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
None => None
)
).unwrap_or(None);
- if let Some(hash_map::Entry::Occupied(mut chan)) = peer_state_opt.as_mut().map(|peer_state| peer_state.channel_by_id.entry(chan_id))
- {
- let counterparty_node_id = chan.get().get_counterparty_node_id();
- match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
- Ok(msgs_monitor_option) => {
- if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
- match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
- ChannelMonitorUpdateStatus::Completed => {},
- e => {
- log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
- "Failed to update channel monitor with preimage {:?}: {:?}",
- payment_preimage, e);
- let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
- mem::drop(peer_state_opt);
- mem::drop(per_peer_state);
- self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
- return Err((counterparty_node_id, err));
- }
- }
- if let Some((msg, commitment_signed)) = msgs {
- log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
- log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id()));
- peer_state_opt.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
- node_id: counterparty_node_id,
- updates: msgs::CommitmentUpdate {
- update_add_htlcs: Vec::new(),
- update_fulfill_htlcs: vec![msg],
- update_fail_htlcs: Vec::new(),
- update_fail_malformed_htlcs: Vec::new(),
- update_fee: None,
- commitment_signed,
- }
- });
- }
- mem::drop(peer_state_opt);
- mem::drop(per_peer_state);
- self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
- Ok(())
- } else {
- Ok(())
- }
- },
- Err((e, monitor_update)) => {
- match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
- ChannelMonitorUpdateStatus::Completed => {},
- e => {
- // TODO: This needs to be handled somehow - if we receive a monitor update
- // with a preimage we *must* somehow manage to propagate it to the upstream
- // channel, or we must have an ability to receive the same update and try
- // again on restart.
- log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info },
- "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}",
- payment_preimage, e);
- },
+ if let Some(mut peer_state_lock) = peer_state_opt.take() {
+ let peer_state = &mut *peer_state_lock;
+ if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) {
+ let counterparty_node_id = chan.get().get_counterparty_node_id();
+ let fulfill_res = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
+
+ if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
+ if let Some(action) = completion_action(Some(htlc_value_msat)) {
+ log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
+ log_bytes!(chan_id), action);
+ peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
}
- let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
- if drop {
- chan.remove_entry();
+ let update_id = monitor_update.update_id;
+ let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
+ let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
+ peer_state, chan);
+ if let Err(e) = res {
+ // TODO: This is a *critical* error - we probably updated the outbound edge
+ // of the HTLC's monitor with a preimage. We should retry this monitor
+ // update over and over again until morale improves.
+ log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
+ return Err((counterparty_node_id, e));
}
- mem::drop(peer_state_opt);
- mem::drop(per_peer_state);
- self.handle_monitor_update_completion_actions(completion_action(None));
- Err((counterparty_node_id, res))
- },
- }
- } else {
- let preimage_update = ChannelMonitorUpdate {
- update_id: CLOSED_CHANNEL_UPDATE_ID,
- updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
- payment_preimage,
- }],
- };
- // We update the ChannelMonitor on the backward link, after
- // receiving an `update_fulfill_htlc` from the forward link.
- let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
- if update_res != ChannelMonitorUpdateStatus::Completed {
- // TODO: This needs to be handled somehow - if we receive a monitor update
- // with a preimage we *must* somehow manage to propagate it to the upstream
- // channel, or we must have an ability to receive the same event and try
- // again on restart.
- log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
- payment_preimage, update_res);
+ }
+ return Ok(());
}
- mem::drop(peer_state_opt);
- mem::drop(per_peer_state);
- // Note that we do process the completion action here. This totally could be a
- // duplicate claim, but we have no way of knowing without interrogating the
- // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
- // generally always allowed to be duplicative (and it's specifically noted in
- // `PaymentForwarded`).
- self.handle_monitor_update_completion_actions(completion_action(None));
- Ok(())
}
+ let preimage_update = ChannelMonitorUpdate {
+ update_id: CLOSED_CHANNEL_UPDATE_ID,
+ updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
+ payment_preimage,
+ }],
+ };
+ // We update the ChannelMonitor on the backward link, after
+ // receiving an `update_fulfill_htlc` from the forward link.
+ let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
+ if update_res != ChannelMonitorUpdateStatus::Completed {
+ // TODO: This needs to be handled somehow - if we receive a monitor update
+ // with a preimage we *must* somehow manage to propagate it to the upstream
+ // channel, or we must have an ability to receive the same event and try
+ // again on restart.
+ log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
+ payment_preimage, update_res);
+ }
+ // Note that we do process the completion action here. This totally could be a
+ // duplicate claim, but we have no way of knowing without interrogating the
+ // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
+ // generally always allowed to be duplicative (and it's specifically noted in
+ // `PaymentForwarded`).
+ self.handle_monitor_update_completion_actions(completion_action(None));
+ Ok(())
}
fn finalize_claims(&self, sources: Vec<HTLCSource>) {
pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
-> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> {
+ log_trace!(self.logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
+ log_bytes!(channel.channel_id()),
+ if raa.is_some() { "an" } else { "no" },
+ if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
+ if funding_broadcastable.is_some() { "" } else { "not " },
+ if channel_ready.is_some() { "sending" } else { "without" },
+ if announcement_sigs.is_some() { "sending" } else { "without" });
+
let mut htlc_forwards = None;
let counterparty_node_id = channel.get_counterparty_node_id();
fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
- let htlc_forwards;
- let (mut pending_failures, finalized_claims, counterparty_node_id) = {
- let counterparty_node_id = match counterparty_node_id {
- Some(cp_id) => cp_id.clone(),
- None => {
- // TODO: Once we can rely on the counterparty_node_id from the
- // monitor event, this and the id_to_peer map should be removed.
- let id_to_peer = self.id_to_peer.lock().unwrap();
- match id_to_peer.get(&funding_txo.to_channel_id()) {
- Some(cp_id) => cp_id.clone(),
- None => return,
- }
- }
- };
- let per_peer_state = self.per_peer_state.read().unwrap();
- let mut peer_state_lock;
- let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
- if peer_state_mutex_opt.is_none() { return }
- peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
- let peer_state = &mut *peer_state_lock;
- let mut channel = {
- match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
- hash_map::Entry::Occupied(chan) => chan,
- hash_map::Entry::Vacant(_) => return,
+ let counterparty_node_id = match counterparty_node_id {
+ Some(cp_id) => cp_id.clone(),
+ None => {
+ // TODO: Once we can rely on the counterparty_node_id from the
+ // monitor event, this and the id_to_peer map should be removed.
+ let id_to_peer = self.id_to_peer.lock().unwrap();
+ match id_to_peer.get(&funding_txo.to_channel_id()) {
+ Some(cp_id) => cp_id.clone(),
+ None => return,
}
- };
- if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
- return;
}
-
- let updates = channel.get_mut().monitor_updating_restored(&self.logger, &self.node_signer, self.genesis_hash, &self.default_configuration, self.best_block.read().unwrap().height());
- let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() {
- // We only send a channel_update in the case where we are just now sending a
- // channel_ready and the channel is in a usable state. We may re-send a
- // channel_update later through the announcement_signatures process for public
- // channels, but there's no reason not to just inform our counterparty of our fees
- // now.
- if let Ok(msg) = self.get_channel_update_for_unicast(channel.get()) {
- Some(events::MessageSendEvent::SendChannelUpdate {
- node_id: channel.get().get_counterparty_node_id(),
- msg,
- })
- } else { None }
- } else { None };
- htlc_forwards = self.handle_channel_resumption(&mut peer_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs);
- if let Some(upd) = channel_update {
- peer_state.pending_msg_events.push(upd);
+ };
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ let mut peer_state_lock;
+ let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+ if peer_state_mutex_opt.is_none() { return }
+ peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+ let peer_state = &mut *peer_state_lock;
+ let mut channel = {
+ match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
+ hash_map::Entry::Occupied(chan) => chan,
+ hash_map::Entry::Vacant(_) => return,
}
-
- (updates.failed_htlcs, updates.finalized_claimed_htlcs, counterparty_node_id)
};
- if let Some(forwards) = htlc_forwards {
- self.forward_htlcs(&mut [forwards][..]);
- }
- self.finalize_claims(finalized_claims);
- for failure in pending_failures.drain(..) {
- let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id: funding_txo.to_channel_id() };
- self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
+ log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}",
+ highest_applied_update_id, channel.get().get_latest_monitor_update_id());
+ if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
+ return;
}
+ handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, channel.get_mut());
}
/// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
}
fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
+ let best_block = *self.best_block.read().unwrap();
+
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
debug_assert!(false);
MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)
})?;
- let ((funding_msg, monitor, mut channel_ready), mut chan) = {
- let best_block = *self.best_block.read().unwrap();
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
+
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+ let peer_state = &mut *peer_state_lock;
+ let ((funding_msg, monitor), chan) =
match peer_state.channel_by_id.entry(msg.temporary_channel_id) {
hash_map::Entry::Occupied(mut chan) => {
(try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.signer_provider, &self.logger), chan), chan.remove())
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id))
- }
- };
- // Because we have exclusive ownership of the channel here we can release the peer_state
- // lock before watch_channel
- match self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor) {
- ChannelMonitorUpdateStatus::Completed => {},
- ChannelMonitorUpdateStatus::PermanentFailure => {
- // Note that we reply with the new channel_id in error messages if we gave up on the
- // channel, not the temporary_channel_id. This is compatible with ourselves, but the
- // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
- // any messages referencing a previously-closed channel anyway.
- // We do not propagate the monitor update to the user as it would be for a monitor
- // that we didn't manage to store (and that we don't care about - we don't respond
- // with the funding_signed so the channel can never go on chain).
- let (_monitor_update, failed_htlcs) = chan.force_shutdown(false);
- assert!(failed_htlcs.is_empty());
- return Err(MsgHandleErrInternal::send_err_msg_no_close("ChannelMonitor storage failure".to_owned(), funding_msg.channel_id));
- },
- ChannelMonitorUpdateStatus::InProgress => {
- // There's no problem signing a counterparty's funding transaction if our monitor
- // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
- // accepted payment from yet. We do, however, need to wait to send our channel_ready
- // until we have persisted our monitor.
- chan.monitor_updating_paused(false, false, channel_ready.is_some(), Vec::new(), Vec::new(), Vec::new());
- channel_ready = None; // Don't send the channel_ready now
- },
- }
- // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the
- // peer exists, despite the inner PeerState potentially having no channels after removing
- // the channel above.
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
+ };
+
match peer_state.channel_by_id.entry(funding_msg.channel_id) {
hash_map::Entry::Occupied(_) => {
- return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
+ Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
},
hash_map::Entry::Vacant(e) => {
- let mut id_to_peer = self.id_to_peer.lock().unwrap();
- match id_to_peer.entry(chan.channel_id()) {
+ match self.id_to_peer.lock().unwrap().entry(chan.channel_id()) {
hash_map::Entry::Occupied(_) => {
return Err(MsgHandleErrInternal::send_err_msg_no_close(
"The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
i_e.insert(chan.get_counterparty_node_id());
}
}
+
+ // There's no problem signing a counterparty's funding transaction if our monitor
+ // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
+ // accepted payment from yet. We do, however, need to wait to send our channel_ready
+ // until we have persisted our monitor.
+ let new_channel_id = funding_msg.channel_id;
peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
node_id: counterparty_node_id.clone(),
msg: funding_msg,
});
- if let Some(msg) = channel_ready {
- send_channel_ready!(self, peer_state.pending_msg_events, chan, msg);
+
+ let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
+
+ let chan = e.insert(chan);
+ let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
+
+ // Note that we reply with the new channel_id in error messages if we gave up on the
+ // channel, not the temporary_channel_id. This is compatible with ourselves, but the
+ // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
+ // any messages referencing a previously-closed channel anyway.
+ // We do not propagate the monitor update to the user as it would be for a monitor
+ // that we didn't manage to store (and that we don't care about - we don't respond
+ // with the funding_signed so the channel can never go on chain).
+ if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res {
+ res.0 = None;
}
- e.insert(chan);
+ res
}
}
- Ok(())
}
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
- let funding_tx = {
- let best_block = *self.best_block.read().unwrap();
- let per_peer_state = self.per_peer_state.read().unwrap();
- let peer_state_mutex = per_peer_state.get(counterparty_node_id)
- .ok_or_else(|| {
- debug_assert!(false);
- MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
- })?;
+ let best_block = *self.best_block.read().unwrap();
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+ .ok_or_else(|| {
+ debug_assert!(false);
+ MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+ })?;
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
- match peer_state.channel_by_id.entry(msg.channel_id) {
- hash_map::Entry::Occupied(mut chan) => {
- let (monitor, funding_tx, channel_ready) = match chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger) {
- Ok(update) => update,
- Err(e) => try_chan_entry!(self, Err(e), chan),
- };
- match self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) {
- ChannelMonitorUpdateStatus::Completed => {},
- e => {
- let mut res = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::RevokeAndACKFirst, channel_ready.is_some(), OPTIONALLY_RESEND_FUNDING_LOCKED);
- if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
- // We weren't able to watch the channel to begin with, so no updates should be made on
- // it. Previously, full_stack_target found an (unreachable) panic when the
- // monitor update contained within `shutdown_finish` was applied.
- if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
- shutdown_finish.0.take();
- }
- }
- return res
- },
- }
- if let Some(msg) = channel_ready {
- send_channel_ready!(self, peer_state.pending_msg_events, chan.get(), msg);
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+ let peer_state = &mut *peer_state_lock;
+ match peer_state.channel_by_id.entry(msg.channel_id) {
+ hash_map::Entry::Occupied(mut chan) => {
+ let monitor = try_chan_entry!(self,
+ chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan);
+ let update_res = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor);
+ let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, chan);
+ if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
+ // We weren't able to watch the channel to begin with, so no updates should be made on
+ // it. Previously, full_stack_target found an (unreachable) panic when the
+ // monitor update contained within `shutdown_finish` was applied.
+ if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
+ shutdown_finish.0.take();
}
- funding_tx
- },
- hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
- }
- };
- log_info!(self.logger, "Broadcasting funding transaction with txid {}", funding_tx.txid());
- self.tx_broadcaster.broadcast_transaction(&funding_tx);
- Ok(())
+ }
+ res
+ },
+ hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
+ }
}
fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> {
if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
}
- let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
+ let funding_txo_opt = chan_entry.get().get_funding_txo();
+ let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self,
+ chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
dropped_htlcs = htlcs;
- // Update the monitor with the shutdown script if necessary.
- if let Some(monitor_update) = monitor_update {
- let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
- let (result, is_permanent) =
- handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
- if is_permanent {
- remove_channel!(self, chan_entry);
- break result;
- }
- }
-
if let Some(msg) = shutdown {
+ // We can send the `shutdown` message before updating the `ChannelMonitor`
+ // here as we don't need the monitor update to complete until we send a
+ // `shutdown_signed`, which we'll delay if we're pending a monitor update.
peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
node_id: *counterparty_node_id,
msg,
});
}
+ // Update the monitor with the shutdown script if necessary.
+ if let Some(monitor_update) = monitor_update_opt {
+ let update_id = monitor_update.update_id;
+ let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
+ break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
+ }
break Ok(());
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
}
- let _ = handle_error!(self, result, *counterparty_node_id);
- Ok(())
+ result
}
fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan) => {
- let (revoke_and_ack, commitment_signed, monitor_update) =
- match chan.get_mut().commitment_signed(&msg, &self.logger) {
- Err((None, e)) => try_chan_entry!(self, Err(e), chan),
- Err((Some(update), e)) => {
- assert!(chan.get().is_awaiting_monitor_update());
- let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &update);
- try_chan_entry!(self, Err(e), chan);
- unreachable!();
- },
- Ok(res) => res
- };
- let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
- if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) {
- return Err(e);
- }
-
- peer_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
- node_id: counterparty_node_id.clone(),
- msg: revoke_and_ack,
- });
- if let Some(msg) = commitment_signed {
- peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
- node_id: counterparty_node_id.clone(),
- updates: msgs::CommitmentUpdate {
- update_add_htlcs: Vec::new(),
- update_fulfill_htlcs: Vec::new(),
- update_fail_htlcs: Vec::new(),
- update_fail_malformed_htlcs: Vec::new(),
- update_fee: None,
- commitment_signed: msg,
- },
- });
- }
- Ok(())
+ let funding_txo = chan.get().get_funding_txo();
+ let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan);
+ let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
+ let update_id = monitor_update.update_id;
+ handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
+ peer_state, chan)
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
}
}
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
- let mut htlcs_to_fail = Vec::new();
- let res = loop {
+ let (htlcs_to_fail, res) = {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan) => {
- let was_paused_for_mon_update = chan.get().is_awaiting_monitor_update();
- let raa_updates = break_chan_entry!(self,
- chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
- htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
- let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &raa_updates.monitor_update);
- if was_paused_for_mon_update {
- assert!(update_res != ChannelMonitorUpdateStatus::Completed);
- assert!(raa_updates.commitment_update.is_none());
- assert!(raa_updates.accepted_htlcs.is_empty());
- assert!(raa_updates.failed_htlcs.is_empty());
- assert!(raa_updates.finalized_claimed_htlcs.is_empty());
- break Err(MsgHandleErrInternal::ignore_no_close("Existing pending monitor update prevented responses to RAA".to_owned()));
- }
- if update_res != ChannelMonitorUpdateStatus::Completed {
- if let Err(e) = handle_monitor_update_res!(self, update_res, chan,
- RAACommitmentOrder::CommitmentFirst, false,
- raa_updates.commitment_update.is_some(), false,
- raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
- raa_updates.finalized_claimed_htlcs) {
- break Err(e);
- } else { unreachable!(); }
- }
- if let Some(updates) = raa_updates.commitment_update {
- peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
- node_id: counterparty_node_id.clone(),
- updates,
- });
- }
- break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
- raa_updates.finalized_claimed_htlcs,
- chan.get().get_short_channel_id()
- .unwrap_or(chan.get().outbound_scid_alias()),
- chan.get().get_funding_txo().unwrap(),
- chan.get().get_user_id()))
+ let funding_txo = chan.get().get_funding_txo();
+ let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
+ let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
+ let update_id = monitor_update.update_id;
+ let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
+ peer_state, chan);
+ (htlcs_to_fail, res)
},
- hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
+ hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
}
};
self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id);
- match res {
- Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs,
- short_channel_id, channel_outpoint, user_channel_id)) =>
- {
- for failure in pending_failures.drain(..) {
- let receiver = HTLCDestination::NextHopChannel { node_id: Some(*counterparty_node_id), channel_id: channel_outpoint.to_channel_id() };
- self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
- }
- self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, user_channel_id, pending_forwards)]);
- self.finalize_claims(finalized_claim_htlcs);
- Ok(())
- },
- Err(e) => Err(e)
- }
+ res
}
fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> {
let mut has_monitor_update = false;
let mut failed_htlcs = Vec::new();
let mut handle_errors = Vec::new();
- {
- let per_peer_state = self.per_peer_state.read().unwrap();
+ let per_peer_state = self.per_peer_state.read().unwrap();
- for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
+ for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
+ 'chan_loop: loop {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
- let pending_msg_events = &mut peer_state.pending_msg_events;
- peer_state.channel_by_id.retain(|channel_id, chan| {
- match chan.maybe_free_holding_cell_htlcs(&self.logger) {
- Ok((commitment_opt, holding_cell_failed_htlcs)) => {
- if !holding_cell_failed_htlcs.is_empty() {
- failed_htlcs.push((
- holding_cell_failed_htlcs,
- *channel_id,
- chan.get_counterparty_node_id()
- ));
- }
- if let Some((commitment_update, monitor_update)) = commitment_opt {
- match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
- ChannelMonitorUpdateStatus::Completed => {
- pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
- node_id: chan.get_counterparty_node_id(),
- updates: commitment_update,
- });
- },
- e => {
- has_monitor_update = true;
- let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY);
- handle_errors.push((chan.get_counterparty_node_id(), res));
- if close_channel { return false; }
- },
- }
- }
- true
- },
- Err(e) => {
- let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id);
- handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
- // ChannelClosed event is generated by handle_error for us
- !close_channel
+ let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
+ for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
+ let counterparty_node_id = chan.get_counterparty_node_id();
+ let funding_txo = chan.get_funding_txo();
+ let (monitor_opt, holding_cell_failed_htlcs) =
+ chan.maybe_free_holding_cell_htlcs(&self.logger);
+ if !holding_cell_failed_htlcs.is_empty() {
+ failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
+ }
+ if let Some(monitor_update) = monitor_opt {
+ has_monitor_update = true;
+
+ let update_res = self.chain_monitor.update_channel(
+ funding_txo.expect("channel is live"), monitor_update);
+ let update_id = monitor_update.update_id;
+ let channel_id: [u8; 32] = *channel_id;
+ let res = handle_new_monitor_update!(self, update_res, update_id,
+ peer_state_lock, peer_state, chan, MANUALLY_REMOVING,
+ peer_state.channel_by_id.remove(&channel_id));
+ if res.is_err() {
+ handle_errors.push((counterparty_node_id, res));
}
+ continue 'chan_loop;
}
- });
+ }
+ break 'chan_loop;
}
}
channel_by_id: HashMap::new(),
latest_features: init_msg.features.clone(),
pending_msg_events: Vec::new(),
+ monitor_update_blocked_actions: BTreeMap::new(),
is_connected: true,
}));
},
htlc_purposes.push(purpose);
}
+ let mut monitor_update_blocked_actions_per_peer = None;
+ let mut peer_states = Vec::new();
+ for (_, peer_state_mutex) in per_peer_state.iter() {
+ peer_states.push(peer_state_mutex.lock().unwrap());
+ }
+
(serializable_peer_count).write(writer)?;
- for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() {
- let peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &*peer_state_lock;
+ for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
// Peers which we have no channels to should be dropped once disconnected. As we
// disconnect all peers when shutting down and serializing the ChannelManager, we
// consider all peers as disconnected here. There's therefore no need write peers with
if !peer_state.ok_to_remove(false) {
peer_pubkey.write(writer)?;
peer_state.latest_features.write(writer)?;
+ if !peer_state.monitor_update_blocked_actions.is_empty() {
+ monitor_update_blocked_actions_per_peer
+ .get_or_insert_with(Vec::new)
+ .push((*peer_pubkey, &peer_state.monitor_update_blocked_actions));
+ }
}
}
// LDK versions prior to 0.0.113 do not know how to read the pending claimed payments
// map. Thus, if there are no entries we skip writing a TLV for it.
pending_claiming_payments = None;
- } else {
- debug_assert!(false, "While we have code to serialize pending_claiming_payments, the map should always be empty until a later PR");
}
write_tlv_fields!(writer, {
(3, pending_outbound_payments, required),
(4, pending_claiming_payments, option),
(5, self.our_network_pubkey, required),
+ (6, monitor_update_blocked_actions_per_peer, option),
(7, self.fake_scid_rand_bytes, required),
(9, htlc_purposes, vec_type),
(11, self.probing_cookie_secret, required),
channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()),
latest_features: Readable::read(reader)?,
pending_msg_events: Vec::new(),
+ monitor_update_blocked_actions: BTreeMap::new(),
is_connected: false,
};
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
let mut probing_cookie_secret: Option<[u8; 32]> = None;
let mut claimable_htlc_purposes = None;
let mut pending_claiming_payments = Some(HashMap::new());
+ let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
read_tlv_fields!(reader, {
(1, pending_outbound_payments_no_retry, option),
(2, pending_intercepted_htlcs, option),
(3, pending_outbound_payments, option),
(4, pending_claiming_payments, option),
(5, received_network_pubkey, option),
+ (6, monitor_update_blocked_actions_per_peer, option),
(7, fake_scid_rand_bytes, option),
(9, claimable_htlc_purposes, vec_type),
(11, probing_cookie_secret, option),
}
}
+ for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
+ if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
+ peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
+ } else {
+ log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);
+ return Err(DecodeError::InvalidValue);
+ }
+ }
+
let channel_manager = ChannelManager {
genesis_hash,
fee_estimator: bounded_fee_estimator,