From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Mon, 24 May 2021 21:02:50 +0000 (+0000) Subject: Merge pull request #851 from TheBlueMatt/2021-03-holding-cell-clear-msg-get X-Git-Tag: v0.0.98~20 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=3a0356fe308ba29916d00f3376cd57ce6613f3d0;hp=-c;p=rust-lightning Merge pull request #851 from TheBlueMatt/2021-03-holding-cell-clear-msg-get Clean up and more liberally free holding cell HTLCs (without re-entrancy) --- 3a0356fe308ba29916d00f3376cd57ce6613f3d0 diff --combined lightning/src/ln/channel.rs index b940b45f,639ac844..dbc63c95 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@@ -39,8 -39,9 +39,8 @@@ use util::errors::APIError use util::config::{UserConfig,ChannelConfig}; use util::scid_utils::scid_from_parts; -use std; -use std::{cmp,mem,fmt}; -use std::ops::Deref; +use core::{cmp,mem,fmt}; +use core::ops::Deref; #[cfg(any(test, feature = "fuzztarget"))] use std::sync::Mutex; use bitcoin::hashes::hex::ToHex; @@@ -1219,7 -1220,7 +1219,7 @@@ impl Channel // on-chain ChannelsMonitors during block rescan. Ideally we'd figure out a way to drop // these, but for now we just have to treat them as normal. - let mut pending_idx = std::usize::MAX; + let mut pending_idx = core::usize::MAX; for (idx, htlc) in self.pending_inbound_htlcs.iter().enumerate() { if htlc.htlc_id == htlc_id_arg { assert_eq!(htlc.payment_hash, payment_hash_calc); @@@ -1242,7 -1243,7 +1242,7 @@@ break; } } - if pending_idx == std::usize::MAX { + if pending_idx == core::usize::MAX { return Err(ChannelError::Ignore("Unable to find a pending HTLC which matched the given HTLC ID".to_owned())); } @@@ -1331,7 -1332,7 +1331,7 @@@ /// /// Note that it is still possible to hit these assertions in case we find a preimage on-chain /// but then have a reorg which settles on an HTLC-failure on chain. - pub fn get_update_fail_htlc(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket) -> Result, ChannelError> { + pub fn get_update_fail_htlc(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L) -> Result, ChannelError> where L::Target: Logger { if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) { panic!("Was asked to fail an HTLC when channel was not in an operational state"); } @@@ -1341,7 -1342,7 +1341,7 @@@ // on-chain ChannelsMonitors during block rescan. Ideally we'd figure out a way to drop // these, but for now we just have to treat them as normal. - let mut pending_idx = std::usize::MAX; + let mut pending_idx = core::usize::MAX; for (idx, htlc) in self.pending_inbound_htlcs.iter().enumerate() { if htlc.htlc_id == htlc_id_arg { match htlc.state { @@@ -1358,7 -1359,7 +1358,7 @@@ pending_idx = idx; } } - if pending_idx == std::usize::MAX { + if pending_idx == core::usize::MAX { return Err(ChannelError::Ignore("Unable to find a pending HTLC which matched the given HTLC ID".to_owned())); } @@@ -1381,6 -1382,7 +1381,7 @@@ _ => {} } } + log_trace!(logger, "Placing failure for HTLC ID {} in holding cell", htlc_id_arg); self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::FailHTLC { htlc_id: htlc_id_arg, err_packet, @@@ -1388,6 -1390,7 +1389,7 @@@ return Ok(None); } + log_trace!(logger, "Failing HTLC ID {} back with a update_fail_htlc message", htlc_id_arg); { let htlc = &mut self.pending_inbound_htlcs[pending_idx]; htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailRelay(err_packet.clone())); @@@ -2307,6 -2310,16 +2309,16 @@@ }, commitment_signed, closing_signed, monitor_update)) } + /// Public version of the below, checking relevant preconditions first. + /// If we're not in a state where freeing the holding cell makes sense, this is a no-op and + /// returns `(None, Vec::new())`. + pub fn maybe_free_holding_cell_htlcs(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger { + if self.channel_state >= ChannelState::ChannelFunded as u32 && + (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32)) == 0 { + self.free_holding_cell_htlcs(logger) + } else { Ok((None, Vec::new())) } + } + /// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them /// fulfilling or failing the last pending HTLC) fn free_holding_cell_htlcs(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger { @@@ -2371,7 -2384,7 +2383,7 @@@ } }, &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => { - match self.get_update_fail_htlc(htlc_id, err_packet.clone()) { + match self.get_update_fail_htlc(htlc_id, err_packet.clone(), logger) { Ok(update_fail_msg_option) => update_fail_htlcs.push(update_fail_msg_option.unwrap()), Err(e) => { if let ChannelError::Ignore(_) = e {} @@@ -2684,19 -2697,16 +2696,16 @@@ } } - /// Removes any uncommitted HTLCs, to be used on peer disconnection, including any pending - /// HTLCs that we intended to add but haven't as we were waiting on a remote revoke. - /// Returns the set of PendingHTLCStatuses from remote uncommitted HTLCs (which we're - /// implicitly dropping) and the payment_hashes of HTLCs we tried to add but are dropping. + /// Removes any uncommitted inbound HTLCs and resets the state of uncommitted outbound HTLC + /// updates, to be used on peer disconnection. After this, update_*_htlc messages need to be + /// resent. /// No further message handling calls may be made until a channel_reestablish dance has /// completed. - pub fn remove_uncommitted_htlcs_and_mark_paused(&mut self, logger: &L) -> Vec<(HTLCSource, PaymentHash)> where L::Target: Logger { - let mut outbound_drops = Vec::new(); - + pub fn remove_uncommitted_htlcs_and_mark_paused(&mut self, logger: &L) where L::Target: Logger { assert_eq!(self.channel_state & ChannelState::ShutdownComplete as u32, 0); if self.channel_state < ChannelState::FundingSent as u32 { self.channel_state = ChannelState::ShutdownComplete as u32; - return outbound_drops; + return; } // Upon reconnect we have to start the closing_signed dance over, but shutdown messages // will be retransmitted. @@@ -2739,23 -2749,8 +2748,8 @@@ } } - self.holding_cell_htlc_updates.retain(|htlc_update| { - match htlc_update { - // Note that currently on channel reestablish we assert that there are - // no holding cell HTLC update_adds, so if in the future we stop - // dropping added HTLCs here and failing them backwards, then there will - // need to be corresponding changes made in the Channel's re-establish - // logic. - &HTLCUpdateAwaitingACK::AddHTLC { ref payment_hash, ref source, .. } => { - outbound_drops.push((source.clone(), payment_hash.clone())); - false - }, - &HTLCUpdateAwaitingACK::ClaimHTLC {..} | &HTLCUpdateAwaitingACK::FailHTLC {..} => true, - } - }); self.channel_state |= ChannelState::PeerDisconnected as u32; - log_debug!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops and {} waiting-to-locally-announced HTLC drops on channel {}", outbound_drops.len(), inbound_drop_count, log_bytes!(self.channel_id())); - outbound_drops + log_debug!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops on channel {}", inbound_drop_count, log_bytes!(self.channel_id())); } /// Indicates that a ChannelMonitor update failed to be stored by the client and further @@@ -2914,7 -2909,7 +2908,7 @@@ /// May panic if some calls other than message-handling calls (which will all Err immediately) /// have been called between remove_uncommitted_htlcs_and_mark_paused and this call. - pub fn channel_reestablish(&mut self, msg: &msgs::ChannelReestablish, logger: &L) -> Result<(Option, Option, Option, Option, RAACommitmentOrder, Option), ChannelError> where L::Target: Logger { + pub fn channel_reestablish(&mut self, msg: &msgs::ChannelReestablish, logger: &L) -> Result<(Option, Option, Option, Option, RAACommitmentOrder, Vec<(HTLCSource, PaymentHash)>, Option), ChannelError> where L::Target: Logger { if self.channel_state & (ChannelState::PeerDisconnected as u32) == 0 { // While BOLT 2 doesn't indicate explicitly we should error this channel here, it // almost certainly indicates we are going to end up out-of-sync in some way, so we @@@ -2965,7 -2960,7 +2959,7 @@@ return Err(ChannelError::Close("Peer claimed they saw a revoke_and_ack but we haven't sent funding_locked yet".to_owned())); } // Short circuit the whole handler as there is nothing we can resend them - return Ok((None, None, None, None, RAACommitmentOrder::CommitmentFirst, shutdown_msg)); + return Ok((None, None, None, None, RAACommitmentOrder::CommitmentFirst, Vec::new(), shutdown_msg)); } // We have OurFundingLocked set! @@@ -2973,7 -2968,7 +2967,7 @@@ return Ok((Some(msgs::FundingLocked { channel_id: self.channel_id(), next_per_commitment_point, - }), None, None, None, RAACommitmentOrder::CommitmentFirst, shutdown_msg)); + }), None, None, None, RAACommitmentOrder::CommitmentFirst, Vec::new(), shutdown_msg)); } let required_revoke = if msg.next_remote_commitment_number + 1 == INITIAL_COMMITMENT_NUMBER - self.cur_holder_commitment_transaction_number { @@@ -3014,14 -3009,6 +3008,6 @@@ } if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateFailed as u32)) == 0 { - // Note that if in the future we no longer drop holding cell update_adds on peer - // disconnect, this logic will need to be updated. - for htlc_update in self.holding_cell_htlc_updates.iter() { - if let &HTLCUpdateAwaitingACK::AddHTLC { .. } = htlc_update { - debug_assert!(false, "There shouldn't be any add-HTLCs in the holding cell now because they should have been dropped on peer disconnect. Panic here because said HTLCs won't be handled correctly."); - } - } - // We're up-to-date and not waiting on a remote revoke (if we are our // channel_reestablish should result in them sending a revoke_and_ack), but we may // have received some updates while we were disconnected. Free the holding cell @@@ -3030,20 -3017,14 +3016,14 @@@ Err(ChannelError::Close(msg)) => return Err(ChannelError::Close(msg)), Err(ChannelError::Ignore(_)) | Err(ChannelError::CloseDelayBroadcast(_)) => panic!("Got non-channel-failing result from free_holding_cell_htlcs"), Ok((Some((commitment_update, monitor_update)), htlcs_to_fail)) => { - // If in the future we no longer drop holding cell update_adds on peer - // disconnect, we may be handed some HTLCs to fail backwards here. - assert!(htlcs_to_fail.is_empty()); - return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), self.resend_order.clone(), shutdown_msg)); + return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), self.resend_order.clone(), htlcs_to_fail, shutdown_msg)); }, Ok((None, htlcs_to_fail)) => { - // If in the future we no longer drop holding cell update_adds on peer - // disconnect, we may be handed some HTLCs to fail backwards here. - assert!(htlcs_to_fail.is_empty()); - return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg)); + return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), htlcs_to_fail, shutdown_msg)); }, } } else { - return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg)); + return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), Vec::new(), shutdown_msg)); } } else if msg.next_local_commitment_number == next_counterparty_commitment_number - 1 { if required_revoke.is_some() { @@@ -3054,10 -3035,10 +3034,10 @@@ if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) != 0 { self.monitor_pending_commitment_signed = true; - return Ok((resend_funding_locked, None, None, None, self.resend_order.clone(), shutdown_msg)); + return Ok((resend_funding_locked, None, None, None, self.resend_order.clone(), Vec::new(), shutdown_msg)); } - return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update(logger)), None, self.resend_order.clone(), shutdown_msg)); + return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update(logger)), None, self.resend_order.clone(), Vec::new(), shutdown_msg)); } else { return Err(ChannelError::Close("Peer attempted to reestablish channel with a very old remote commitment transaction".to_owned())); } @@@ -4393,7 -4374,7 +4373,7 @@@ impl Readable for ChannelUpdateStatus impl Writeable for Channel { fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { // Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been - // called but include holding cell updates (and obviously we don't modify self). + // called. writer.write_all(&[SERIALIZATION_VERSION; 1])?; writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; @@@ -4409,8 -4390,8 +4389,8 @@@ let mut key_data = VecWriter(Vec::new()); self.holder_signer.write(&mut key_data)?; - assert!(key_data.0.len() < std::usize::MAX); - assert!(key_data.0.len() < std::u32::MAX as usize); + assert!(key_data.0.len() < core::usize::MAX); + assert!(key_data.0.len() < core::u32::MAX as usize); (key_data.0.len() as u32).write(writer)?; writer.write_all(&key_data.0[..])?; diff --combined lightning/src/ln/channelmanager.rs index fe3be2ae,9f9820c1..190fb2bc --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@@ -61,15 -61,15 +61,15 @@@ use util::chacha20::{ChaCha20, ChaChaRe use util::logger::Logger; use util::errors::APIError; -use std::{cmp, mem}; +use core::{cmp, mem}; use std::collections::{HashMap, hash_map, HashSet}; use std::io::{Cursor, Read}; use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::time::Duration; #[cfg(any(test, feature = "allow_wallclock_use"))] use std::time::Instant; -use std::ops::Deref; +use core::ops::Deref; use bitcoin::hashes::hex::ToHex; // We hold various information about HTLC relay in the HTLC objects in Channel itself: @@@ -766,22 -766,44 +766,44 @@@ macro_rules! handle_error } } + /// Returns (boolean indicating if we should remove the Channel object from memory, a mapped error) + macro_rules! convert_chan_err { + ($self: ident, $err: expr, $short_to_id: expr, $channel: expr, $channel_id: expr) => { + match $err { + ChannelError::Ignore(msg) => { + (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $channel_id.clone())) + }, + ChannelError::Close(msg) => { + log_trace!($self.logger, "Closing channel {} due to close-required error: {}", log_bytes!($channel_id[..]), msg); + if let Some(short_id) = $channel.get_short_channel_id() { + $short_to_id.remove(&short_id); + } + let shutdown_res = $channel.force_shutdown(true); + (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update(&$channel).ok())) + }, + ChannelError::CloseDelayBroadcast(msg) => { + log_error!($self.logger, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($channel_id[..]), msg); + if let Some(short_id) = $channel.get_short_channel_id() { + $short_to_id.remove(&short_id); + } + let shutdown_res = $channel.force_shutdown(false); + (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update(&$channel).ok())) + } + } + } + } + macro_rules! break_chan_entry { ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => { match $res { Ok(res) => res, - Err(ChannelError::Ignore(msg)) => { - break Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $entry.key().clone())) - }, - Err(ChannelError::Close(msg)) => { - log_trace!($self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!($entry.key()[..]), msg); - let (channel_id, mut chan) = $entry.remove_entry(); - if let Some(short_id) = chan.get_short_channel_id() { - $channel_state.short_to_id.remove(&short_id); + Err(e) => { + let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_id, $entry.get_mut(), $entry.key()); + if drop { + $entry.remove_entry(); } - break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok())) - }, - Err(ChannelError::CloseDelayBroadcast(_)) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); } + break Err(res); + } } } } @@@ -790,25 -812,12 +812,12 @@@ macro_rules! try_chan_entry ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => { match $res { Ok(res) => res, - Err(ChannelError::Ignore(msg)) => { - return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $entry.key().clone())) - }, - Err(ChannelError::Close(msg)) => { - log_trace!($self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!($entry.key()[..]), msg); - let (channel_id, mut chan) = $entry.remove_entry(); - if let Some(short_id) = chan.get_short_channel_id() { - $channel_state.short_to_id.remove(&short_id); + Err(e) => { + let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_id, $entry.get_mut(), $entry.key()); + if drop { + $entry.remove_entry(); } - return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok())) - }, - Err(ChannelError::CloseDelayBroadcast(msg)) => { - log_error!($self.logger, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($entry.key()[..]), msg); - let (channel_id, mut chan) = $entry.remove_entry(); - if let Some(short_id) = chan.get_short_channel_id() { - $channel_state.short_to_id.remove(&short_id); - } - let shutdown_res = chan.force_shutdown(false); - return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, shutdown_res, $self.get_channel_update(&chan).ok())) + return Err(res); } } } @@@ -818,13 -827,12 +827,12 @@@ macro_rules! handle_monitor_err ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => { handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new()) }; - ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { + ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $chan_id: expr) => { match $err { ChannelMonitorUpdateErr::PermanentFailure => { - log_error!($self.logger, "Closing channel {} due to monitor update PermanentFailure", log_bytes!($entry.key()[..])); - let (channel_id, mut chan) = $entry.remove_entry(); - if let Some(short_id) = chan.get_short_channel_id() { - $channel_state.short_to_id.remove(&short_id); + log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateErr::PermanentFailure", log_bytes!($chan_id[..])); + if let Some(short_id) = $chan.get_short_channel_id() { + $short_to_id.remove(&short_id); } // TODO: $failed_fails is dropped here, which will cause other channels to hit the // chain in a confused state! We need to move them into the ChannelMonitor which @@@ -835,12 -843,12 +843,12 @@@ // splitting hairs we'd prefer to claim payments that were to us, but we haven't // given up the preimage yet, so might as well just wait until the payment is // retried, avoiding the on-chain fees. - let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok())); - res + let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.force_shutdown(true), $self.get_channel_update(&$chan).ok())); + (res, true) }, ChannelMonitorUpdateErr::TemporaryFailure => { log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards and {} fails", - log_bytes!($entry.key()[..]), + log_bytes!($chan_id[..]), if $resend_commitment && $resend_raa { match $action_type { RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" }, @@@ -857,11 -865,18 +865,18 @@@ if !$resend_raa { debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment); } - $entry.get_mut().monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails); - Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$entry.key())) + $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails); + (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false) }, } - } + }; + ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { { + let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $entry.key()); + if drop { + $entry.remove_entry(); + } + res + } }; } macro_rules! return_monitor_err { @@@ -885,6 -900,133 +900,133 @@@ macro_rules! maybe_break_monitor_err } } + macro_rules! handle_chan_restoration_locked { + ($self: ident, $channel_lock: expr, $channel_state: expr, $channel_entry: expr, + $raa: expr, $commitment_update: expr, $order: expr, $chanmon_update: expr, + $pending_forwards: expr, $funding_broadcastable: expr, $funding_locked: expr) => { { + let mut htlc_forwards = None; + let counterparty_node_id = $channel_entry.get().get_counterparty_node_id(); + + let chanmon_update: Option = $chanmon_update; // Force type-checking to resolve + let chanmon_update_is_none = chanmon_update.is_none(); + let res = loop { + let forwards: Vec<(PendingHTLCInfo, u64)> = $pending_forwards; // Force type-checking to resolve + if !forwards.is_empty() { + htlc_forwards = Some(($channel_entry.get().get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), + $channel_entry.get().get_funding_txo().unwrap(), forwards)); + } + + if chanmon_update.is_some() { + // On reconnect, we, by definition, only resend a funding_locked if there have been + // no commitment updates, so the only channel monitor update which could also be + // associated with a funding_locked would be the funding_created/funding_signed + // monitor update. That monitor update failing implies that we won't send + // funding_locked until it's been updated, so we can't have a funding_locked and a + // monitor update here (so we don't bother to handle it correctly below). + assert!($funding_locked.is_none()); + // A channel monitor update makes no sense without either a funding_locked or a + // commitment update to process after it. Since we can't have a funding_locked, we + // only bother to handle the monitor-update + commitment_update case below. + assert!($commitment_update.is_some()); + } + + if let Some(msg) = $funding_locked { + // Similar to the above, this implies that we're letting the funding_locked fly + // before it should be allowed to. + assert!(chanmon_update.is_none()); + $channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { + node_id: counterparty_node_id, + msg, + }); + if let Some(announcement_sigs) = $self.get_announcement_sigs($channel_entry.get()) { + $channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: counterparty_node_id, + msg: announcement_sigs, + }); + } + $channel_state.short_to_id.insert($channel_entry.get().get_short_channel_id().unwrap(), $channel_entry.get().channel_id()); + } + + let funding_broadcastable: Option = $funding_broadcastable; // Force type-checking to resolve + if let Some(monitor_update) = chanmon_update { + // We only ever broadcast a funding transaction in response to a funding_signed + // message and the resulting monitor update. Thus, on channel_reestablish + // message handling we can't have a funding transaction to broadcast. When + // processing a monitor update finishing resulting in a funding broadcast, we + // cannot have a second monitor update, thus this case would indicate a bug. + assert!(funding_broadcastable.is_none()); + // Given we were just reconnected or finished updating a channel monitor, the + // only case where we can get a new ChannelMonitorUpdate would be if we also + // have some commitment updates to send as well. + assert!($commitment_update.is_some()); + if let Err(e) = $self.chain_monitor.update_channel($channel_entry.get().get_funding_txo().unwrap(), monitor_update) { + // channel_reestablish doesn't guarantee the order it returns is sensical + // for the messages it returns, but if we're setting what messages to + // re-transmit on monitor update success, we need to make sure it is sane. + let mut order = $order; + if $raa.is_none() { + order = RAACommitmentOrder::CommitmentFirst; + } + break handle_monitor_err!($self, e, $channel_state, $channel_entry, order, $raa.is_some(), true); + } + } + + macro_rules! handle_cs { () => { + if let Some(update) = $commitment_update { + $channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: counterparty_node_id, + updates: update, + }); + } + } } + macro_rules! handle_raa { () => { + if let Some(revoke_and_ack) = $raa { + $channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { + node_id: counterparty_node_id, + msg: revoke_and_ack, + }); + } + } } + match $order { + RAACommitmentOrder::CommitmentFirst => { + handle_cs!(); + handle_raa!(); + }, + RAACommitmentOrder::RevokeAndACKFirst => { + handle_raa!(); + handle_cs!(); + }, + } + if let Some(tx) = funding_broadcastable { + log_info!($self.logger, "Broadcasting funding transaction with txid {}", tx.txid()); + $self.tx_broadcaster.broadcast_transaction(&tx); + } + break Ok(()); + }; + + if chanmon_update_is_none { + // If there was no ChannelMonitorUpdate, we should never generate an Err in the res loop + // above. Doing so would imply calling handle_err!() from channel_monitor_updated() which + // should *never* end up calling back to `chain_monitor.update_channel()`. + assert!(res.is_ok()); + } + + (htlc_forwards, res, counterparty_node_id) + } } + } + + macro_rules! post_handle_chan_restoration { + ($self: ident, $locked_res: expr) => { { + let (htlc_forwards, res, counterparty_node_id) = $locked_res; + + let _ = handle_error!($self, res, counterparty_node_id); + + if let Some(forwards) = htlc_forwards { + $self.forward_htlcs(&mut [forwards][..]); + } + } } + } + impl ChannelManager where M::Target: chain::Watch, T::Target: BroadcasterInterface, @@@ -1781,7 -1923,7 +1923,7 @@@ // be absurd. We ensure this by checking that at least 500 (our stated public contract on when // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB // message... - const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2; + const HALF_MESSAGE_IS_ADDRS: u32 = ::core::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2; #[deny(const_err)] #[allow(dead_code)] // ...by failing to compile if the number of addresses that would be half of a message is @@@ -1927,7 -2069,7 +2069,7 @@@ }, HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => { log_trace!(self.logger, "Failing HTLC back to channel with short id {} after delay", short_chan_id); - match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet) { + match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) { Err(e) => { if let ChannelError::Ignore(msg) = e { log_trace!(self.logger, "Failed to fail backwards to short_id {}: {}", short_chan_id, msg); @@@ -2593,85 -2735,24 +2735,24 @@@ pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let mut close_results = Vec::new(); - let mut htlc_forwards = Vec::new(); - let mut htlc_failures = Vec::new(); - let mut pending_events = Vec::new(); - - { + let (mut pending_failures, chan_restoration_res) = { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; - let short_to_id = &mut channel_state.short_to_id; - let pending_msg_events = &mut channel_state.pending_msg_events; - let channel = match channel_state.by_id.get_mut(&funding_txo.to_channel_id()) { - Some(chan) => chan, - None => return, + let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) { + hash_map::Entry::Occupied(chan) => chan, + hash_map::Entry::Vacant(_) => return, }; - if !channel.is_awaiting_monitor_update() || channel.get_latest_monitor_update_id() != highest_applied_update_id { + if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id { return; } - let (raa, commitment_update, order, pending_forwards, mut pending_failures, funding_broadcastable, funding_locked) = channel.monitor_updating_restored(&self.logger); - if !pending_forwards.is_empty() { - htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), funding_txo.clone(), pending_forwards)); - } - htlc_failures.append(&mut pending_failures); - - macro_rules! handle_cs { () => { - if let Some(update) = commitment_update { - pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: channel.get_counterparty_node_id(), - updates: update, - }); - } - } } - macro_rules! handle_raa { () => { - if let Some(revoke_and_ack) = raa { - pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { - node_id: channel.get_counterparty_node_id(), - msg: revoke_and_ack, - }); - } - } } - match order { - RAACommitmentOrder::CommitmentFirst => { - handle_cs!(); - handle_raa!(); - }, - RAACommitmentOrder::RevokeAndACKFirst => { - handle_raa!(); - handle_cs!(); - }, - } - if let Some(tx) = funding_broadcastable { - log_info!(self.logger, "Broadcasting funding transaction with txid {}", tx.txid()); - self.tx_broadcaster.broadcast_transaction(&tx); - } - if let Some(msg) = funding_locked { - pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { - node_id: channel.get_counterparty_node_id(), - msg, - }); - if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { - pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { - node_id: channel.get_counterparty_node_id(), - msg: announcement_sigs, - }); - } - short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); - } - } - - self.pending_events.lock().unwrap().append(&mut pending_events); - - for failure in htlc_failures.drain(..) { + let (raa, commitment_update, order, pending_forwards, pending_failures, funding_broadcastable, funding_locked) = channel.get_mut().monitor_updating_restored(&self.logger); + (pending_failures, handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, raa, commitment_update, order, None, pending_forwards, funding_broadcastable, funding_locked)) + }; + post_handle_chan_restoration!(self, chan_restoration_res); + for failure in pending_failures.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); } - self.forward_htlcs(&mut htlc_forwards[..]); - - for res in close_results.drain(..) { - self.finish_force_close_channel(res); - } } fn internal_open_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { @@@ -3282,77 -3363,35 +3363,35 @@@ } fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; + let (htlcs_failed_forward, chan_restoration_res) = { + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_state_lock; - match channel_state.by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } - // Currently, we expect all holding cell update_adds to be dropped on peer - // disconnect, so Channel's reestablish will never hand us any holding cell - // freed HTLCs to fail backwards. If in the future we no longer drop pending - // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here. - let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, mut order, shutdown) = - try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan); - if let Some(monitor_update) = monitor_update_opt { - if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { - // channel_reestablish doesn't guarantee the order it returns is sensical - // for the messages it returns, but if we're setting what messages to - // re-transmit on monitor update success, we need to make sure it is sane. - if revoke_and_ack.is_none() { - order = RAACommitmentOrder::CommitmentFirst; - } - if commitment_update.is_none() { - order = RAACommitmentOrder::RevokeAndACKFirst; - } - return_monitor_err!(self, e, channel_state, chan, order, revoke_and_ack.is_some(), commitment_update.is_some()); - //TODO: Resend the funding_locked if needed once we get the monitor running again - } - } - if let Some(msg) = funding_locked { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { - node_id: counterparty_node_id.clone(), - msg - }); - } - macro_rules! send_raa { () => { - if let Some(msg) = revoke_and_ack { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { - node_id: counterparty_node_id.clone(), - msg - }); + match channel_state.by_id.entry(msg.channel_id) { + hash_map::Entry::Occupied(mut chan) => { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { + return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - } } - macro_rules! send_cu { () => { - if let Some(updates) = commitment_update { - channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + // Currently, we expect all holding cell update_adds to be dropped on peer + // disconnect, so Channel's reestablish will never hand us any holding cell + // freed HTLCs to fail backwards. If in the future we no longer drop pending + // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here. + let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, order, htlcs_failed_forward, shutdown) = + try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan); + if let Some(msg) = shutdown { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: counterparty_node_id.clone(), - updates + msg, }); } - } } - match order { - RAACommitmentOrder::RevokeAndACKFirst => { - send_raa!(); - send_cu!(); - }, - RAACommitmentOrder::CommitmentFirst => { - send_cu!(); - send_raa!(); - }, - } - if let Some(msg) = shutdown { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: counterparty_node_id.clone(), - msg, - }); - } - Ok(()) - }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) - } + (htlcs_failed_forward, handle_chan_restoration_locked!(self, channel_state_lock, channel_state, chan, revoke_and_ack, commitment_update, order, monitor_update_opt, Vec::new(), None, funding_locked)) + }, + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + } + }; + post_handle_chan_restoration!(self, chan_restoration_res); + self.fail_holding_cell_htlcs(htlcs_failed_forward, msg.channel_id); + Ok(()) } /// Begin Update fee process. Allowed only on an outbound channel. @@@ -3458,6 -3497,57 +3497,57 @@@ } } + /// Check the holding cell in each channel and free any pending HTLCs in them if possible. + /// This should only apply to HTLCs which were added to the holding cell because we were + /// waiting on a monitor update to finish. In that case, we don't want to free the holding cell + /// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user + /// code to inform them of a channel monitor update. + fn check_free_holding_cells(&self) { + let mut failed_htlcs = Vec::new(); + let mut handle_errors = Vec::new(); + { + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_state_lock; + let by_id = &mut channel_state.by_id; + let short_to_id = &mut channel_state.short_to_id; + let pending_msg_events = &mut channel_state.pending_msg_events; + + by_id.retain(|channel_id, chan| { + match chan.maybe_free_holding_cell_htlcs(&self.logger) { + Ok((None, ref htlcs)) if htlcs.is_empty() => true, + Ok((commitment_opt, holding_cell_failed_htlcs)) => { + failed_htlcs.push((holding_cell_failed_htlcs, *channel_id)); + if let Some((commitment_update, monitor_update)) = commitment_opt { + if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) { + let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id); + handle_errors.push((chan.get_counterparty_node_id(), res)); + if close_channel { return false; } + } else { + pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: chan.get_counterparty_node_id(), + updates: commitment_update, + }); + } + } + true + }, + Err(e) => { + let (close_channel, res) = convert_chan_err!(self, e, short_to_id, chan, channel_id); + handle_errors.push((chan.get_counterparty_node_id(), Err(res))); + !close_channel + } + } + }); + } + for (failures, channel_id) in failed_htlcs.drain(..) { + self.fail_holding_cell_htlcs(failures, channel_id); + } + + for (counterparty_node_id, err) in handle_errors.drain(..) { + let _ = handle_error!(self, err, counterparty_node_id); + } + } + /// Handle a list of channel failures during a block_connected or block_disconnected call, /// pushing the channel monitor update (if any) to the background events queue and removing the /// Channel object. @@@ -3594,6 -3684,8 +3684,8 @@@ impl(node_a: &'a Node<'b, 'c, 'd>, node_b: &'a Node<'b, 'c, 'd>, tx: &Transaction) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) { - let conf_height = std::cmp::max(node_a.best_block_info().1 + 1, node_b.best_block_info().1 + 1); + let conf_height = core::cmp::max(node_a.best_block_info().1 + 1, node_b.best_block_info().1 + 1); create_chan_between_nodes_with_value_confirm_first(node_a, node_b, tx, conf_height); confirm_transaction_at(node_a, tx, conf_height); connect_blocks(node_a, CHAN_CONFIRM_DEPTH - 1); @@@ -1515,6 -1515,11 +1515,11 @@@ macro_rules! handle_chan_reestablish_ms None }; + if let Some(&MessageSendEvent::SendAnnouncementSignatures { ref node_id, msg: _ }) = msg_events.get(idx) { + idx += 1; + assert_eq!(*node_id, $dst_node.node.get_our_node_id()); + } + let mut revoke_and_ack = None; let mut commitment_update = None; let order = if let Some(ev) = msg_events.get(idx) {