use util::config::{UserConfig,ChannelConfig};
use util::scid_utils::scid_from_parts;
-use std;
-use std::{cmp,mem,fmt};
-use std::ops::Deref;
+use core::{cmp,mem,fmt};
+use core::ops::Deref;
#[cfg(any(test, feature = "fuzztarget"))]
use std::sync::Mutex;
use bitcoin::hashes::hex::ToHex;
// on-chain ChannelsMonitors during block rescan. Ideally we'd figure out a way to drop
// these, but for now we just have to treat them as normal.
- let mut pending_idx = std::usize::MAX;
+ let mut pending_idx = core::usize::MAX;
for (idx, htlc) in self.pending_inbound_htlcs.iter().enumerate() {
if htlc.htlc_id == htlc_id_arg {
assert_eq!(htlc.payment_hash, payment_hash_calc);
break;
}
}
- if pending_idx == std::usize::MAX {
+ if pending_idx == core::usize::MAX {
return Err(ChannelError::Ignore("Unable to find a pending HTLC which matched the given HTLC ID".to_owned()));
}
///
/// Note that it is still possible to hit these assertions in case we find a preimage on-chain
/// but then have a reorg which settles on an HTLC-failure on chain.
- pub fn get_update_fail_htlc(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket) -> Result<Option<msgs::UpdateFailHTLC>, ChannelError> {
+ pub fn get_update_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L) -> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) {
panic!("Was asked to fail an HTLC when channel was not in an operational state");
}
// on-chain ChannelsMonitors during block rescan. Ideally we'd figure out a way to drop
// these, but for now we just have to treat them as normal.
- let mut pending_idx = std::usize::MAX;
+ let mut pending_idx = core::usize::MAX;
for (idx, htlc) in self.pending_inbound_htlcs.iter().enumerate() {
if htlc.htlc_id == htlc_id_arg {
match htlc.state {
pending_idx = idx;
}
}
- if pending_idx == std::usize::MAX {
+ if pending_idx == core::usize::MAX {
return Err(ChannelError::Ignore("Unable to find a pending HTLC which matched the given HTLC ID".to_owned()));
}
_ => {}
}
}
+ log_trace!(logger, "Placing failure for HTLC ID {} in holding cell", htlc_id_arg);
self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::FailHTLC {
htlc_id: htlc_id_arg,
err_packet,
return Ok(None);
}
+ log_trace!(logger, "Failing HTLC ID {} back with a update_fail_htlc message", htlc_id_arg);
{
let htlc = &mut self.pending_inbound_htlcs[pending_idx];
htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailRelay(err_packet.clone()));
}, commitment_signed, closing_signed, monitor_update))
}
+ /// Public version of the below, checking relevant preconditions first.
+ /// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
+ /// returns `(None, Vec::new())`.
+ pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
+ if self.channel_state >= ChannelState::ChannelFunded as u32 &&
+ (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32)) == 0 {
+ self.free_holding_cell_htlcs(logger)
+ } else { Ok((None, Vec::new())) }
+ }
+
/// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them
/// fulfilling or failing the last pending HTLC)
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
}
},
&HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => {
- match self.get_update_fail_htlc(htlc_id, err_packet.clone()) {
+ match self.get_update_fail_htlc(htlc_id, err_packet.clone(), logger) {
Ok(update_fail_msg_option) => update_fail_htlcs.push(update_fail_msg_option.unwrap()),
Err(e) => {
if let ChannelError::Ignore(_) = e {}
}
}
- /// Removes any uncommitted HTLCs, to be used on peer disconnection, including any pending
- /// HTLCs that we intended to add but haven't as we were waiting on a remote revoke.
- /// Returns the set of PendingHTLCStatuses from remote uncommitted HTLCs (which we're
- /// implicitly dropping) and the payment_hashes of HTLCs we tried to add but are dropping.
+ /// Removes any uncommitted inbound HTLCs and resets the state of uncommitted outbound HTLC
+ /// updates, to be used on peer disconnection. After this, update_*_htlc messages need to be
+ /// resent.
/// No further message handling calls may be made until a channel_reestablish dance has
/// completed.
- pub fn remove_uncommitted_htlcs_and_mark_paused<L: Deref>(&mut self, logger: &L) -> Vec<(HTLCSource, PaymentHash)> where L::Target: Logger {
- let mut outbound_drops = Vec::new();
-
+ pub fn remove_uncommitted_htlcs_and_mark_paused<L: Deref>(&mut self, logger: &L) where L::Target: Logger {
assert_eq!(self.channel_state & ChannelState::ShutdownComplete as u32, 0);
if self.channel_state < ChannelState::FundingSent as u32 {
self.channel_state = ChannelState::ShutdownComplete as u32;
- return outbound_drops;
+ return;
}
// Upon reconnect we have to start the closing_signed dance over, but shutdown messages
// will be retransmitted.
}
}
- self.holding_cell_htlc_updates.retain(|htlc_update| {
- match htlc_update {
- // Note that currently on channel reestablish we assert that there are
- // no holding cell HTLC update_adds, so if in the future we stop
- // dropping added HTLCs here and failing them backwards, then there will
- // need to be corresponding changes made in the Channel's re-establish
- // logic.
- &HTLCUpdateAwaitingACK::AddHTLC { ref payment_hash, ref source, .. } => {
- outbound_drops.push((source.clone(), payment_hash.clone()));
- false
- },
- &HTLCUpdateAwaitingACK::ClaimHTLC {..} | &HTLCUpdateAwaitingACK::FailHTLC {..} => true,
- }
- });
self.channel_state |= ChannelState::PeerDisconnected as u32;
- log_debug!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops and {} waiting-to-locally-announced HTLC drops on channel {}", outbound_drops.len(), inbound_drop_count, log_bytes!(self.channel_id()));
- outbound_drops
+ log_debug!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops on channel {}", inbound_drop_count, log_bytes!(self.channel_id()));
}
/// Indicates that a ChannelMonitor update failed to be stored by the client and further
/// May panic if some calls other than message-handling calls (which will all Err immediately)
/// have been called between remove_uncommitted_htlcs_and_mark_paused and this call.
- pub fn channel_reestablish<L: Deref>(&mut self, msg: &msgs::ChannelReestablish, logger: &L) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, Option<ChannelMonitorUpdate>, RAACommitmentOrder, Option<msgs::Shutdown>), ChannelError> where L::Target: Logger {
+ pub fn channel_reestablish<L: Deref>(&mut self, msg: &msgs::ChannelReestablish, logger: &L) -> Result<(Option<msgs::FundingLocked>, Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, Option<ChannelMonitorUpdate>, RAACommitmentOrder, Vec<(HTLCSource, PaymentHash)>, Option<msgs::Shutdown>), ChannelError> where L::Target: Logger {
if self.channel_state & (ChannelState::PeerDisconnected as u32) == 0 {
// While BOLT 2 doesn't indicate explicitly we should error this channel here, it
// almost certainly indicates we are going to end up out-of-sync in some way, so we
return Err(ChannelError::Close("Peer claimed they saw a revoke_and_ack but we haven't sent funding_locked yet".to_owned()));
}
// Short circuit the whole handler as there is nothing we can resend them
- return Ok((None, None, None, None, RAACommitmentOrder::CommitmentFirst, shutdown_msg));
+ return Ok((None, None, None, None, RAACommitmentOrder::CommitmentFirst, Vec::new(), shutdown_msg));
}
// We have OurFundingLocked set!
return Ok((Some(msgs::FundingLocked {
channel_id: self.channel_id(),
next_per_commitment_point,
- }), None, None, None, RAACommitmentOrder::CommitmentFirst, shutdown_msg));
+ }), None, None, None, RAACommitmentOrder::CommitmentFirst, Vec::new(), shutdown_msg));
}
let required_revoke = if msg.next_remote_commitment_number + 1 == INITIAL_COMMITMENT_NUMBER - self.cur_holder_commitment_transaction_number {
}
if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateFailed as u32)) == 0 {
- // Note that if in the future we no longer drop holding cell update_adds on peer
- // disconnect, this logic will need to be updated.
- for htlc_update in self.holding_cell_htlc_updates.iter() {
- if let &HTLCUpdateAwaitingACK::AddHTLC { .. } = htlc_update {
- debug_assert!(false, "There shouldn't be any add-HTLCs in the holding cell now because they should have been dropped on peer disconnect. Panic here because said HTLCs won't be handled correctly.");
- }
- }
-
// We're up-to-date and not waiting on a remote revoke (if we are our
// channel_reestablish should result in them sending a revoke_and_ack), but we may
// have received some updates while we were disconnected. Free the holding cell
Err(ChannelError::Close(msg)) => return Err(ChannelError::Close(msg)),
Err(ChannelError::Ignore(_)) | Err(ChannelError::CloseDelayBroadcast(_)) => panic!("Got non-channel-failing result from free_holding_cell_htlcs"),
Ok((Some((commitment_update, monitor_update)), htlcs_to_fail)) => {
- // If in the future we no longer drop holding cell update_adds on peer
- // disconnect, we may be handed some HTLCs to fail backwards here.
- assert!(htlcs_to_fail.is_empty());
- return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), self.resend_order.clone(), shutdown_msg));
+ return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), self.resend_order.clone(), htlcs_to_fail, shutdown_msg));
},
Ok((None, htlcs_to_fail)) => {
- // If in the future we no longer drop holding cell update_adds on peer
- // disconnect, we may be handed some HTLCs to fail backwards here.
- assert!(htlcs_to_fail.is_empty());
- return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg));
+ return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), htlcs_to_fail, shutdown_msg));
},
}
} else {
- return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg));
+ return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), Vec::new(), shutdown_msg));
}
} else if msg.next_local_commitment_number == next_counterparty_commitment_number - 1 {
if required_revoke.is_some() {
if self.channel_state & (ChannelState::MonitorUpdateFailed as u32) != 0 {
self.monitor_pending_commitment_signed = true;
- return Ok((resend_funding_locked, None, None, None, self.resend_order.clone(), shutdown_msg));
+ return Ok((resend_funding_locked, None, None, None, self.resend_order.clone(), Vec::new(), shutdown_msg));
}
- return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update(logger)), None, self.resend_order.clone(), shutdown_msg));
+ return Ok((resend_funding_locked, required_revoke, Some(self.get_last_commitment_update(logger)), None, self.resend_order.clone(), Vec::new(), shutdown_msg));
} else {
return Err(ChannelError::Close("Peer attempted to reestablish channel with a very old remote commitment transaction".to_owned()));
}
impl<Signer: Sign> Writeable for Channel<Signer> {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
// Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been
- // called but include holding cell updates (and obviously we don't modify self).
+ // called.
writer.write_all(&[SERIALIZATION_VERSION; 1])?;
writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
let mut key_data = VecWriter(Vec::new());
self.holder_signer.write(&mut key_data)?;
- assert!(key_data.0.len() < std::usize::MAX);
- assert!(key_data.0.len() < std::u32::MAX as usize);
+ assert!(key_data.0.len() < core::usize::MAX);
+ assert!(key_data.0.len() < core::u32::MAX as usize);
(key_data.0.len() as u32).write(writer)?;
writer.write_all(&key_data.0[..])?;
use util::logger::Logger;
use util::errors::APIError;
-use std::{cmp, mem};
+use core::{cmp, mem};
use std::collections::{HashMap, hash_map, HashSet};
use std::io::{Cursor, Read};
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::time::Duration;
+use core::sync::atomic::{AtomicUsize, Ordering};
+use core::time::Duration;
#[cfg(any(test, feature = "allow_wallclock_use"))]
use std::time::Instant;
-use std::ops::Deref;
+use core::ops::Deref;
use bitcoin::hashes::hex::ToHex;
// We hold various information about HTLC relay in the HTLC objects in Channel itself:
}
}
+ /// Returns (boolean indicating if we should remove the Channel object from memory, a mapped error)
+ macro_rules! convert_chan_err {
+ ($self: ident, $err: expr, $short_to_id: expr, $channel: expr, $channel_id: expr) => {
+ match $err {
+ ChannelError::Ignore(msg) => {
+ (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $channel_id.clone()))
+ },
+ ChannelError::Close(msg) => {
+ log_trace!($self.logger, "Closing channel {} due to close-required error: {}", log_bytes!($channel_id[..]), msg);
+ if let Some(short_id) = $channel.get_short_channel_id() {
+ $short_to_id.remove(&short_id);
+ }
+ let shutdown_res = $channel.force_shutdown(true);
+ (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update(&$channel).ok()))
+ },
+ ChannelError::CloseDelayBroadcast(msg) => {
+ log_error!($self.logger, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($channel_id[..]), msg);
+ if let Some(short_id) = $channel.get_short_channel_id() {
+ $short_to_id.remove(&short_id);
+ }
+ let shutdown_res = $channel.force_shutdown(false);
+ (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update(&$channel).ok()))
+ }
+ }
+ }
+ }
+
macro_rules! break_chan_entry {
($self: ident, $res: expr, $channel_state: expr, $entry: expr) => {
match $res {
Ok(res) => res,
- Err(ChannelError::Ignore(msg)) => {
- break Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $entry.key().clone()))
- },
- Err(ChannelError::Close(msg)) => {
- log_trace!($self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!($entry.key()[..]), msg);
- let (channel_id, mut chan) = $entry.remove_entry();
- if let Some(short_id) = chan.get_short_channel_id() {
- $channel_state.short_to_id.remove(&short_id);
+ Err(e) => {
+ let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_id, $entry.get_mut(), $entry.key());
+ if drop {
+ $entry.remove_entry();
}
- break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
- },
- Err(ChannelError::CloseDelayBroadcast(_)) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
+ break Err(res);
+ }
}
}
}
($self: ident, $res: expr, $channel_state: expr, $entry: expr) => {
match $res {
Ok(res) => res,
- Err(ChannelError::Ignore(msg)) => {
- return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $entry.key().clone()))
- },
- Err(ChannelError::Close(msg)) => {
- log_trace!($self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!($entry.key()[..]), msg);
- let (channel_id, mut chan) = $entry.remove_entry();
- if let Some(short_id) = chan.get_short_channel_id() {
- $channel_state.short_to_id.remove(&short_id);
+ Err(e) => {
+ let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_id, $entry.get_mut(), $entry.key());
+ if drop {
+ $entry.remove_entry();
}
- return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
- },
- Err(ChannelError::CloseDelayBroadcast(msg)) => {
- log_error!($self.logger, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($entry.key()[..]), msg);
- let (channel_id, mut chan) = $entry.remove_entry();
- if let Some(short_id) = chan.get_short_channel_id() {
- $channel_state.short_to_id.remove(&short_id);
- }
- let shutdown_res = chan.force_shutdown(false);
- return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, shutdown_res, $self.get_channel_update(&chan).ok()))
+ return Err(res);
}
}
}
($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new())
};
- ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
+ ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $chan_id: expr) => {
match $err {
ChannelMonitorUpdateErr::PermanentFailure => {
- log_error!($self.logger, "Closing channel {} due to monitor update PermanentFailure", log_bytes!($entry.key()[..]));
- let (channel_id, mut chan) = $entry.remove_entry();
- if let Some(short_id) = chan.get_short_channel_id() {
- $channel_state.short_to_id.remove(&short_id);
+ log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateErr::PermanentFailure", log_bytes!($chan_id[..]));
+ if let Some(short_id) = $chan.get_short_channel_id() {
+ $short_to_id.remove(&short_id);
}
// TODO: $failed_fails is dropped here, which will cause other channels to hit the
// chain in a confused state! We need to move them into the ChannelMonitor which
// splitting hairs we'd prefer to claim payments that were to us, but we haven't
// given up the preimage yet, so might as well just wait until the payment is
// retried, avoiding the on-chain fees.
- let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()));
- res
+ let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.force_shutdown(true), $self.get_channel_update(&$chan).ok()));
+ (res, true)
},
ChannelMonitorUpdateErr::TemporaryFailure => {
log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards and {} fails",
- log_bytes!($entry.key()[..]),
+ log_bytes!($chan_id[..]),
if $resend_commitment && $resend_raa {
match $action_type {
RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" },
if !$resend_raa {
debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
}
- $entry.get_mut().monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
- Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$entry.key()))
+ $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
+ (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
},
}
- }
+ };
+ ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { {
+ let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $entry.key());
+ if drop {
+ $entry.remove_entry();
+ }
+ res
+ } };
}
macro_rules! return_monitor_err {
}
}
+ macro_rules! handle_chan_restoration_locked {
+ ($self: ident, $channel_lock: expr, $channel_state: expr, $channel_entry: expr,
+ $raa: expr, $commitment_update: expr, $order: expr, $chanmon_update: expr,
+ $pending_forwards: expr, $funding_broadcastable: expr, $funding_locked: expr) => { {
+ let mut htlc_forwards = None;
+ let counterparty_node_id = $channel_entry.get().get_counterparty_node_id();
+
+ let chanmon_update: Option<ChannelMonitorUpdate> = $chanmon_update; // Force type-checking to resolve
+ let chanmon_update_is_none = chanmon_update.is_none();
+ let res = loop {
+ let forwards: Vec<(PendingHTLCInfo, u64)> = $pending_forwards; // Force type-checking to resolve
+ if !forwards.is_empty() {
+ htlc_forwards = Some(($channel_entry.get().get_short_channel_id().expect("We can't have pending forwards before funding confirmation"),
+ $channel_entry.get().get_funding_txo().unwrap(), forwards));
+ }
+
+ if chanmon_update.is_some() {
+ // On reconnect, we, by definition, only resend a funding_locked if there have been
+ // no commitment updates, so the only channel monitor update which could also be
+ // associated with a funding_locked would be the funding_created/funding_signed
+ // monitor update. That monitor update failing implies that we won't send
+ // funding_locked until it's been updated, so we can't have a funding_locked and a
+ // monitor update here (so we don't bother to handle it correctly below).
+ assert!($funding_locked.is_none());
+ // A channel monitor update makes no sense without either a funding_locked or a
+ // commitment update to process after it. Since we can't have a funding_locked, we
+ // only bother to handle the monitor-update + commitment_update case below.
+ assert!($commitment_update.is_some());
+ }
+
+ if let Some(msg) = $funding_locked {
+ // Similar to the above, this implies that we're letting the funding_locked fly
+ // before it should be allowed to.
+ assert!(chanmon_update.is_none());
+ $channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
+ node_id: counterparty_node_id,
+ msg,
+ });
+ if let Some(announcement_sigs) = $self.get_announcement_sigs($channel_entry.get()) {
+ $channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
+ node_id: counterparty_node_id,
+ msg: announcement_sigs,
+ });
+ }
+ $channel_state.short_to_id.insert($channel_entry.get().get_short_channel_id().unwrap(), $channel_entry.get().channel_id());
+ }
+
+ let funding_broadcastable: Option<Transaction> = $funding_broadcastable; // Force type-checking to resolve
+ if let Some(monitor_update) = chanmon_update {
+ // We only ever broadcast a funding transaction in response to a funding_signed
+ // message and the resulting monitor update. Thus, on channel_reestablish
+ // message handling we can't have a funding transaction to broadcast. When
+ // processing a monitor update finishing resulting in a funding broadcast, we
+ // cannot have a second monitor update, thus this case would indicate a bug.
+ assert!(funding_broadcastable.is_none());
+ // Given we were just reconnected or finished updating a channel monitor, the
+ // only case where we can get a new ChannelMonitorUpdate would be if we also
+ // have some commitment updates to send as well.
+ assert!($commitment_update.is_some());
+ if let Err(e) = $self.chain_monitor.update_channel($channel_entry.get().get_funding_txo().unwrap(), monitor_update) {
+ // channel_reestablish doesn't guarantee the order it returns is sensical
+ // for the messages it returns, but if we're setting what messages to
+ // re-transmit on monitor update success, we need to make sure it is sane.
+ let mut order = $order;
+ if $raa.is_none() {
+ order = RAACommitmentOrder::CommitmentFirst;
+ }
+ break handle_monitor_err!($self, e, $channel_state, $channel_entry, order, $raa.is_some(), true);
+ }
+ }
+
+ macro_rules! handle_cs { () => {
+ if let Some(update) = $commitment_update {
+ $channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+ node_id: counterparty_node_id,
+ updates: update,
+ });
+ }
+ } }
+ macro_rules! handle_raa { () => {
+ if let Some(revoke_and_ack) = $raa {
+ $channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
+ node_id: counterparty_node_id,
+ msg: revoke_and_ack,
+ });
+ }
+ } }
+ match $order {
+ RAACommitmentOrder::CommitmentFirst => {
+ handle_cs!();
+ handle_raa!();
+ },
+ RAACommitmentOrder::RevokeAndACKFirst => {
+ handle_raa!();
+ handle_cs!();
+ },
+ }
+ if let Some(tx) = funding_broadcastable {
+ log_info!($self.logger, "Broadcasting funding transaction with txid {}", tx.txid());
+ $self.tx_broadcaster.broadcast_transaction(&tx);
+ }
+ break Ok(());
+ };
+
+ if chanmon_update_is_none {
+ // If there was no ChannelMonitorUpdate, we should never generate an Err in the res loop
+ // above. Doing so would imply calling handle_err!() from channel_monitor_updated() which
+ // should *never* end up calling back to `chain_monitor.update_channel()`.
+ assert!(res.is_ok());
+ }
+
+ (htlc_forwards, res, counterparty_node_id)
+ } }
+ }
+
+ macro_rules! post_handle_chan_restoration {
+ ($self: ident, $locked_res: expr) => { {
+ let (htlc_forwards, res, counterparty_node_id) = $locked_res;
+
+ let _ = handle_error!($self, res, counterparty_node_id);
+
+ if let Some(forwards) = htlc_forwards {
+ $self.forward_htlcs(&mut [forwards][..]);
+ }
+ } }
+ }
+
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<Signer, M, T, K, F, L>
where M::Target: chain::Watch<Signer>,
T::Target: BroadcasterInterface,
// be absurd. We ensure this by checking that at least 500 (our stated public contract on when
// broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
// message...
- const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2;
+ const HALF_MESSAGE_IS_ADDRS: u32 = ::core::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2;
#[deny(const_err)]
#[allow(dead_code)]
// ...by failing to compile if the number of addresses that would be half of a message is
},
HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
log_trace!(self.logger, "Failing HTLC back to channel with short id {} after delay", short_chan_id);
- match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet) {
+ match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
Err(e) => {
if let ChannelError::Ignore(msg) = e {
log_trace!(self.logger, "Failed to fail backwards to short_id {}: {}", short_chan_id, msg);
pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
- let mut close_results = Vec::new();
- let mut htlc_forwards = Vec::new();
- let mut htlc_failures = Vec::new();
- let mut pending_events = Vec::new();
-
- {
+ let (mut pending_failures, chan_restoration_res) = {
let mut channel_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_lock;
- let short_to_id = &mut channel_state.short_to_id;
- let pending_msg_events = &mut channel_state.pending_msg_events;
- let channel = match channel_state.by_id.get_mut(&funding_txo.to_channel_id()) {
- Some(chan) => chan,
- None => return,
+ let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) {
+ hash_map::Entry::Occupied(chan) => chan,
+ hash_map::Entry::Vacant(_) => return,
};
- if !channel.is_awaiting_monitor_update() || channel.get_latest_monitor_update_id() != highest_applied_update_id {
+ if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
return;
}
- let (raa, commitment_update, order, pending_forwards, mut pending_failures, funding_broadcastable, funding_locked) = channel.monitor_updating_restored(&self.logger);
- if !pending_forwards.is_empty() {
- htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), funding_txo.clone(), pending_forwards));
- }
- htlc_failures.append(&mut pending_failures);
-
- macro_rules! handle_cs { () => {
- if let Some(update) = commitment_update {
- pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
- node_id: channel.get_counterparty_node_id(),
- updates: update,
- });
- }
- } }
- macro_rules! handle_raa { () => {
- if let Some(revoke_and_ack) = raa {
- pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
- node_id: channel.get_counterparty_node_id(),
- msg: revoke_and_ack,
- });
- }
- } }
- match order {
- RAACommitmentOrder::CommitmentFirst => {
- handle_cs!();
- handle_raa!();
- },
- RAACommitmentOrder::RevokeAndACKFirst => {
- handle_raa!();
- handle_cs!();
- },
- }
- if let Some(tx) = funding_broadcastable {
- log_info!(self.logger, "Broadcasting funding transaction with txid {}", tx.txid());
- self.tx_broadcaster.broadcast_transaction(&tx);
- }
- if let Some(msg) = funding_locked {
- pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
- node_id: channel.get_counterparty_node_id(),
- msg,
- });
- if let Some(announcement_sigs) = self.get_announcement_sigs(channel) {
- pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
- node_id: channel.get_counterparty_node_id(),
- msg: announcement_sigs,
- });
- }
- short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
- }
- }
-
- self.pending_events.lock().unwrap().append(&mut pending_events);
-
- for failure in htlc_failures.drain(..) {
+ let (raa, commitment_update, order, pending_forwards, pending_failures, funding_broadcastable, funding_locked) = channel.get_mut().monitor_updating_restored(&self.logger);
+ (pending_failures, handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, raa, commitment_update, order, None, pending_forwards, funding_broadcastable, funding_locked))
+ };
+ post_handle_chan_restoration!(self, chan_restoration_res);
+ for failure in pending_failures.drain(..) {
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
}
- self.forward_htlcs(&mut htlc_forwards[..]);
-
- for res in close_results.drain(..) {
- self.finish_force_close_channel(res);
- }
}
fn internal_open_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
}
fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
+ let (htlcs_failed_forward, chan_restoration_res) = {
+ let mut channel_state_lock = self.channel_state.lock().unwrap();
+ let channel_state = &mut *channel_state_lock;
- match channel_state.by_id.entry(msg.channel_id) {
- hash_map::Entry::Occupied(mut chan) => {
- if chan.get().get_counterparty_node_id() != *counterparty_node_id {
- return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
- }
- // Currently, we expect all holding cell update_adds to be dropped on peer
- // disconnect, so Channel's reestablish will never hand us any holding cell
- // freed HTLCs to fail backwards. If in the future we no longer drop pending
- // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here.
- let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, mut order, shutdown) =
- try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan);
- if let Some(monitor_update) = monitor_update_opt {
- if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
- // channel_reestablish doesn't guarantee the order it returns is sensical
- // for the messages it returns, but if we're setting what messages to
- // re-transmit on monitor update success, we need to make sure it is sane.
- if revoke_and_ack.is_none() {
- order = RAACommitmentOrder::CommitmentFirst;
- }
- if commitment_update.is_none() {
- order = RAACommitmentOrder::RevokeAndACKFirst;
- }
- return_monitor_err!(self, e, channel_state, chan, order, revoke_and_ack.is_some(), commitment_update.is_some());
- //TODO: Resend the funding_locked if needed once we get the monitor running again
- }
- }
- if let Some(msg) = funding_locked {
- channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
- node_id: counterparty_node_id.clone(),
- msg
- });
- }
- macro_rules! send_raa { () => {
- if let Some(msg) = revoke_and_ack {
- channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
- node_id: counterparty_node_id.clone(),
- msg
- });
+ match channel_state.by_id.entry(msg.channel_id) {
+ hash_map::Entry::Occupied(mut chan) => {
+ if chan.get().get_counterparty_node_id() != *counterparty_node_id {
+ return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
}
- } }
- macro_rules! send_cu { () => {
- if let Some(updates) = commitment_update {
- channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+ // Currently, we expect all holding cell update_adds to be dropped on peer
+ // disconnect, so Channel's reestablish will never hand us any holding cell
+ // freed HTLCs to fail backwards. If in the future we no longer drop pending
+ // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here.
+ let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, order, htlcs_failed_forward, shutdown) =
+ try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan);
+ if let Some(msg) = shutdown {
+ channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
node_id: counterparty_node_id.clone(),
- updates
+ msg,
});
}
- } }
- match order {
- RAACommitmentOrder::RevokeAndACKFirst => {
- send_raa!();
- send_cu!();
- },
- RAACommitmentOrder::CommitmentFirst => {
- send_cu!();
- send_raa!();
- },
- }
- if let Some(msg) = shutdown {
- channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
- node_id: counterparty_node_id.clone(),
- msg,
- });
- }
- Ok(())
- },
- hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
- }
+ (htlcs_failed_forward, handle_chan_restoration_locked!(self, channel_state_lock, channel_state, chan, revoke_and_ack, commitment_update, order, monitor_update_opt, Vec::new(), None, funding_locked))
+ },
+ hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
+ }
+ };
+ post_handle_chan_restoration!(self, chan_restoration_res);
+ self.fail_holding_cell_htlcs(htlcs_failed_forward, msg.channel_id);
+ Ok(())
}
/// Begin Update fee process. Allowed only on an outbound channel.
}
}
+ /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
+ /// This should only apply to HTLCs which were added to the holding cell because we were
+ /// waiting on a monitor update to finish. In that case, we don't want to free the holding cell
+ /// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user
+ /// code to inform them of a channel monitor update.
+ fn check_free_holding_cells(&self) {
+ let mut failed_htlcs = Vec::new();
+ let mut handle_errors = Vec::new();
+ {
+ let mut channel_state_lock = self.channel_state.lock().unwrap();
+ let channel_state = &mut *channel_state_lock;
+ let by_id = &mut channel_state.by_id;
+ let short_to_id = &mut channel_state.short_to_id;
+ let pending_msg_events = &mut channel_state.pending_msg_events;
+
+ by_id.retain(|channel_id, chan| {
+ match chan.maybe_free_holding_cell_htlcs(&self.logger) {
+ Ok((None, ref htlcs)) if htlcs.is_empty() => true,
+ Ok((commitment_opt, holding_cell_failed_htlcs)) => {
+ failed_htlcs.push((holding_cell_failed_htlcs, *channel_id));
+ if let Some((commitment_update, monitor_update)) = commitment_opt {
+ if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
+ let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id);
+ handle_errors.push((chan.get_counterparty_node_id(), res));
+ if close_channel { return false; }
+ } else {
+ pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+ node_id: chan.get_counterparty_node_id(),
+ updates: commitment_update,
+ });
+ }
+ }
+ true
+ },
+ Err(e) => {
+ let (close_channel, res) = convert_chan_err!(self, e, short_to_id, chan, channel_id);
+ handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
+ !close_channel
+ }
+ }
+ });
+ }
+ for (failures, channel_id) in failed_htlcs.drain(..) {
+ self.fail_holding_cell_htlcs(failures, channel_id);
+ }
+
+ for (counterparty_node_id, err) in handle_errors.drain(..) {
+ let _ = handle_error!(self, err, counterparty_node_id);
+ }
+ }
+
/// Handle a list of channel failures during a block_connected or block_disconnected call,
/// pushing the channel monitor update (if any) to the background events queue and removing the
/// Channel object.
// ChannelMonitors when clearing other events.
self.process_pending_monitor_events();
+ self.check_free_holding_cells();
+
let mut ret = Vec::new();
let mut channel_state = self.channel_state.lock().unwrap();
mem::swap(&mut ret, &mut channel_state.pending_msg_events);
fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
let mut failed_channels = Vec::new();
- let mut failed_payments = Vec::new();
let mut no_channels_remain = true;
{
let mut channel_state_lock = self.channel_state.lock().unwrap();
log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates", log_pubkey!(counterparty_node_id));
channel_state.by_id.retain(|_, chan| {
if chan.get_counterparty_node_id() == *counterparty_node_id {
- // Note that currently on channel reestablish we assert that there are no
- // holding cell add-HTLCs, so if in the future we stop removing uncommitted HTLCs
- // on peer disconnect here, there will need to be corresponding changes in
- // reestablish logic.
- let failed_adds = chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
- if !failed_adds.is_empty() {
- let chan_update = self.get_channel_update(&chan).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe
- failed_payments.push((chan_update, failed_adds));
- }
+ chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
if chan.is_shutdown() {
if let Some(short_id) = chan.get_short_channel_id() {
short_to_id.remove(&short_id);
for failure in failed_channels.drain(..) {
self.finish_force_close_channel(failure);
}
- for (chan_update, mut htlc_sources) in failed_payments {
- for (htlc_source, payment_hash) in htlc_sources.drain(..) {
- self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.clone() });
- }
- }
}
fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init) {
mod tests {
use ln::channelmanager::PersistenceNotifier;
use std::sync::Arc;
- use std::sync::atomic::{AtomicBool, Ordering};
+ use core::sync::atomic::{AtomicBool, Ordering};
use std::thread;
- use std::time::Duration;
+ use core::time::Duration;
#[test]
fn test_wait_timeout() {