X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=1fa5bd4d519f3bc49e566c7f055b4b46f1a04c0f;hb=b66e3c53768f6bc7bc43064f1818051d22477c63;hp=4dec5cb1a3243d5cd8aeeec059df97fe416348ad;hpb=8f93e2dc944ebb9a07e2be97773649525121e31f;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 4dec5cb1..1fa5bd4d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -50,7 +50,7 @@ use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParame use crate::ln::msgs; use crate::ln::onion_utils; use crate::ln::onion_utils::HTLCFailReason; -use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VALUE_MSAT}; +use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; #[cfg(test)] use crate::ln::outbound_payment; use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment}; @@ -131,6 +131,9 @@ pub(super) struct PendingHTLCInfo { /// may overshoot this in either case) pub(super) outgoing_amt_msat: u64, pub(super) outgoing_cltv_value: u32, + /// The fee being skimmed off the top of this HTLC. If this is a forward, it'll be the fee we are + /// skimming. If we're receiving this HTLC, it's the fee that our counterparty skimmed. + pub(super) skimmed_fee_msat: Option, } #[derive(Clone)] // See Channel::revoke_and_ack for why, tl;dr: Rust bug @@ -210,6 +213,8 @@ struct ClaimableHTLC { total_value_received: Option, /// The sender intended sum total of all MPP parts specified in the onion total_msat: u64, + /// The extra fee our counterparty skimmed off the top of this HTLC. + counterparty_skimmed_fee_msat: Option, } /// A payment identifier used to uniquely identify a payment to LDK. @@ -312,7 +317,7 @@ impl core::hash::Hash for HTLCSource { } } impl HTLCSource { - #[cfg(not(feature = "grind_signatures"))] + #[cfg(all(feature = "_test_vectors", not(feature = "grind_signatures")))] #[cfg(test)] pub fn dummy() -> Self { HTLCSource::OutboundRoute { @@ -628,6 +633,13 @@ pub(super) struct PeerState { /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the + /// user but which have not yet completed. + /// + /// Note that the channel may no longer exist. For example if the channel was closed but we + /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update + /// for a missing channel. + in_flight_monitor_updates: BTreeMap>, /// Map from a specific channel to some action(s) that should be taken when all pending /// [`ChannelMonitorUpdate`]s for the channel complete updating. /// @@ -663,6 +675,7 @@ impl PeerState { return false } self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty() + && self.in_flight_monitor_updates.is_empty() } // Returns a count of all channels we have with this peer, including pending channels. @@ -739,7 +752,23 @@ pub type SimpleArcChannelManager = ChannelManager< /// of [`KeysManager`] and [`DefaultRouter`]. /// /// This is not exported to bindings users as Arcs don't make sense in bindings -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = + ChannelManager< + &'a M, + &'b T, + &'c KeysManager, + &'c KeysManager, + &'c KeysManager, + &'d F, + &'e DefaultRouter< + &'f NetworkGraph<&'g L>, + &'g L, + &'h Mutex, &'g L>>, + ProbabilisticScoringFeeParameters, + ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L> + >, + &'g L + >; macro_rules! define_test_pub_trait { ($vis: vis) => { /// A trivial trait which describes any [`ChannelManager`] used in testing. @@ -1451,6 +1480,9 @@ pub struct ChannelDetails { /// /// [`confirmations_required`]: ChannelDetails::confirmations_required pub is_channel_ready: bool, + /// The stage of the channel's shutdown. + /// `None` for `ChannelDetails` serialized on LDK versions prior to 0.0.116. + pub channel_shutdown_state: Option, /// True if the channel is (a) confirmed and channel_ready messages have been exchanged, (b) /// the peer is connected, and (c) the channel is not currently negotiating a shutdown. /// @@ -1490,10 +1522,13 @@ impl ChannelDetails { self.short_channel_id.or(self.outbound_scid_alias) } - fn from_channel_context(context: &ChannelContext, - best_block_height: u32, latest_features: InitFeatures) -> Self { - - let balance = context.get_available_balances(); + fn from_channel_context( + context: &ChannelContext, best_block_height: u32, latest_features: InitFeatures, + fee_estimator: &LowerBoundedFeeEstimator + ) -> Self + where F::Target: FeeEstimator + { + let balance = context.get_available_balances(fee_estimator); let (to_remote_reserve_satoshis, to_self_reserve_satoshis) = context.get_holder_counterparty_selected_channel_reserve_satoshis(); ChannelDetails { @@ -1538,10 +1573,33 @@ impl ChannelDetails { inbound_htlc_minimum_msat: Some(context.get_holder_htlc_minimum_msat()), inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(), config: Some(context.config()), + channel_shutdown_state: Some(context.shutdown_state()), } } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Further information on the details of the channel shutdown. +/// Upon channels being forced closed (i.e. commitment transaction confirmation detected +/// by `ChainMonitor`), ChannelShutdownState will be set to `ShutdownComplete` or +/// the channel will be removed shortly. +/// Also note, that in normal operation, peers could disconnect at any of these states +/// and require peer re-connection before making progress onto other states +pub enum ChannelShutdownState { + /// Channel has not sent or received a shutdown message. + NotShuttingDown, + /// Local node has sent a shutdown message for this channel. + ShutdownInitiated, + /// Shutdown message exchanges have concluded and the channels are in the midst of + /// resolving all existing open HTLCs before closing can continue. + ResolvingHTLCs, + /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates. + NegotiatingClosingFee, + /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about + /// to drop the channel. + ShutdownComplete, +} + /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments. /// These include payments that have yet to find a successful path, or have unresolved HTLCs. #[derive(Debug, PartialEq)] @@ -1806,7 +1864,7 @@ macro_rules! emit_channel_ready_event { } macro_rules! handle_monitor_update_completion { - ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { + ($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { let mut updates = $chan.monitor_updating_restored(&$self.logger, &$self.node_signer, $self.genesis_hash, &$self.default_configuration, $self.best_block.read().unwrap().height()); @@ -1855,7 +1913,7 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); @@ -1866,13 +1924,13 @@ macro_rules! handle_new_monitor_update { ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", log_bytes!($chan.context.channel_id()[..])); - Ok(()) + Ok(false) }, ChannelMonitorUpdateStatus::PermanentFailure => { log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan.context.channel_id()[..])); update_maps_on_chan_removal!($self, &$chan.context); - let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown( + let res = Err(MsgHandleErrInternal::from_finish_shutdown( "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(), $chan.context.get_user_id(), $chan.context.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok())); @@ -1880,16 +1938,42 @@ macro_rules! handle_new_monitor_update { res }, ChannelMonitorUpdateStatus::Completed => { - $chan.complete_one_mon_update($update_id); - if $chan.no_monitor_updates_pending() { - handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); - } - Ok(()) + $completed; + Ok(true) }, } } }; - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { - handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) + }; + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry()) + }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) + .or_insert_with(Vec::new); + // During startup, we push monitor updates as background events through to here in + // order to replay updates that were in-flight when we shut down. Thus, we have to + // filter for uniqueness here. + let idx = in_flight_updates.iter().position(|upd| upd == &$update) + .unwrap_or_else(|| { + in_flight_updates.push($update); + in_flight_updates.len() - 1 + }); + let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]); + handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + { + let _ = in_flight_updates.remove(idx); + if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); + } + }) + } }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) } } @@ -1968,6 +2052,8 @@ where { /// Constructs a new `ChannelManager` to hold several channels and route between them. /// + /// The current time or latest block header time can be provided as the `current_timestamp`. + /// /// This is the main "logic hub" for all channel-related actions, and implements /// [`ChannelMessageHandler`]. /// @@ -1981,7 +2067,11 @@ where /// [`block_connected`]: chain::Listen::block_connected /// [`block_disconnected`]: chain::Listen::block_disconnected /// [`params.best_block.block_hash`]: chain::BestBlock::block_hash - pub fn new(fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters) -> Self { + pub fn new( + fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, + node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters, + current_timestamp: u32, + ) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); let inbound_pmt_key_material = node_signer.get_inbound_payment_key_material(); @@ -2013,7 +2103,7 @@ where probing_cookie_secret: entropy_source.get_secure_random_bytes(), - highest_seen_timestamp: AtomicUsize::new(0), + highest_seen_timestamp: AtomicUsize::new(current_timestamp as usize), per_peer_state: FairRwLock::new(HashMap::new()), @@ -2153,7 +2243,7 @@ where let peer_state = &mut *peer_state_lock; for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } } @@ -2179,17 +2269,17 @@ where let peer_state = &mut *peer_state_lock; for (_channel_id, channel) in peer_state.channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } } @@ -2222,7 +2312,8 @@ where return peer_state.channel_by_id .iter() .map(|(_, channel)| - ChannelDetails::from_channel_context(&channel.context, best_block_height, features.clone())) + ChannelDetails::from_channel_context(&channel.context, best_block_height, + features.clone(), &self.fee_estimator)) .collect(); } vec![] @@ -2304,9 +2395,8 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt.take() { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); } if chan_entry.get().is_shutdown() { @@ -2521,9 +2611,11 @@ where } } - fn construct_recv_pending_htlc_info(&self, hop_data: msgs::OnionHopData, shared_secret: [u8; 32], - payment_hash: PaymentHash, amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>) -> Result - { + fn construct_recv_pending_htlc_info( + &self, hop_data: msgs::OnionHopData, shared_secret: [u8; 32], payment_hash: PaymentHash, + amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>, allow_underpay: bool, + counterparty_skimmed_fee_msat: Option, + ) -> Result { // final_incorrect_cltv_expiry if hop_data.outgoing_cltv_value > cltv_expiry { return Err(ReceiveError { @@ -2549,7 +2641,10 @@ where msg: "The final CLTV expiry is too soon to handle", }); } - if hop_data.amt_to_forward > amt_msat { + if (!allow_underpay && hop_data.amt_to_forward > amt_msat) || + (allow_underpay && hop_data.amt_to_forward > + amt_msat.saturating_add(counterparty_skimmed_fee_msat.unwrap_or(0))) + { return Err(ReceiveError { err_code: 19, err_data: amt_msat.to_be_bytes().to_vec(), @@ -2616,15 +2711,18 @@ where incoming_amt_msat: Some(amt_msat), outgoing_amt_msat: hop_data.amt_to_forward, outgoing_cltv_value: hop_data.outgoing_cltv_value, + skimmed_fee_msat: counterparty_skimmed_fee_msat, }) } - fn decode_update_add_htlc_onion(&self, msg: &msgs::UpdateAddHTLC) -> PendingHTLCStatus { + fn decode_update_add_htlc_onion( + &self, msg: &msgs::UpdateAddHTLC + ) -> Result<(onion_utils::Hop, [u8; 32], Option>), HTLCFailureMsg> { macro_rules! return_malformed_err { ($msg: expr, $err_code: expr) => { { log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg); - return PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC { + return Err(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC { channel_id: msg.channel_id, htlc_id: msg.htlc_id, sha256_of_onion: Sha256::hash(&msg.onion_routing_packet.hop_data).into_inner(), @@ -2655,7 +2753,7 @@ where ($msg: expr, $err_code: expr, $data: expr) => { { log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg); - return PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { + return Err(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { channel_id: msg.channel_id, htlc_id: msg.htlc_id, reason: HTLCFailReason::reason($err_code, $data.to_vec()) @@ -2674,11 +2772,186 @@ where return_err!(err_msg, err_code, &[0; 0]); }, }; + let (outgoing_scid, outgoing_amt_msat, outgoing_cltv_value, next_packet_pk_opt) = match next_hop { + onion_utils::Hop::Forward { + next_hop_data: msgs::OnionHopData { + format: msgs::OnionHopDataFormat::NonFinalNode { short_channel_id }, amt_to_forward, + outgoing_cltv_value, + }, .. + } => { + let next_pk = onion_utils::next_hop_packet_pubkey(&self.secp_ctx, + msg.onion_routing_packet.public_key.unwrap(), &shared_secret); + (short_channel_id, amt_to_forward, outgoing_cltv_value, Some(next_pk)) + }, + // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the + // inbound channel's state. + onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)), + onion_utils::Hop::Forward { + next_hop_data: msgs::OnionHopData { format: msgs::OnionHopDataFormat::FinalNode { .. }, .. }, .. + } => { + return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]); + } + }; - let pending_forward_info = match next_hop { + // Perform outbound checks here instead of in [`Self::construct_pending_htlc_info`] because we + // can't hold the outbound peer state lock at the same time as the inbound peer state lock. + if let Some((err, mut code, chan_update)) = loop { + let id_option = self.short_to_chan_info.read().unwrap().get(&outgoing_scid).cloned(); + let forwarding_chan_info_opt = match id_option { + None => { // unknown_next_peer + // Note that this is likely a timing oracle for detecting whether an scid is a + // phantom or an intercept. + if (self.default_configuration.accept_intercept_htlcs && + fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, outgoing_scid, &self.genesis_hash)) || + fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, outgoing_scid, &self.genesis_hash) + { + None + } else { + break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + } + }, + Some((cp_id, id)) => Some((cp_id.clone(), id.clone())), + }; + let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { + break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let chan = match peer_state.channel_by_id.get_mut(&forwarding_id) { + None => { + // Channel was removed. The short_to_chan_info and channel_by_id maps + // have no consistency guarantees. + break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + }, + Some(chan) => chan + }; + if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels { + // Note that the behavior here should be identical to the above block - we + // should NOT reveal the existence or non-existence of a private channel if + // we don't allow forwards outbound over them. + break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None)); + } + if chan.context.get_channel_type().supports_scid_privacy() && outgoing_scid != chan.context.outbound_scid_alias() { + // `option_scid_alias` (referred to in LDK as `scid_privacy`) means + // "refuse to forward unless the SCID alias was used", so we pretend + // we don't have the channel here. + break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None)); + } + let chan_update_opt = self.get_channel_update_for_onion(outgoing_scid, chan).ok(); + + // Note that we could technically not return an error yet here and just hope + // that the connection is reestablished or monitor updated by the time we get + // around to doing the actual forward, but better to fail early if we can and + // hopefully an attacker trying to path-trace payments cannot make this occur + // on a small/per-node/per-channel scale. + if !chan.context.is_live() { // channel_disabled + // If the channel_update we're going to return is disabled (i.e. the + // peer has been disabled for some time), return `channel_disabled`, + // otherwise return `temporary_channel_failure`. + if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { + break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); + } else { + break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); + } + } + if outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum + break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); + } + if let Err((err, code)) = chan.htlc_satisfies_config(&msg, outgoing_amt_msat, outgoing_cltv_value) { + break Some((err, code, chan_update_opt)); + } + chan_update_opt + } else { + if (msg.cltv_expiry as u64) < (outgoing_cltv_value) as u64 + MIN_CLTV_EXPIRY_DELTA as u64 { + // We really should set `incorrect_cltv_expiry` here but as we're not + // forwarding over a real channel we can't generate a channel_update + // for it. Instead we just return a generic temporary_node_failure. + break Some(( + "Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta", + 0x2000 | 2, None, + )); + } + None + }; + + let cur_height = self.best_block.read().unwrap().height() + 1; + // Theoretically, channel counterparty shouldn't send us a HTLC expiring now, + // but we want to be robust wrt to counterparty packet sanitization (see + // HTLC_FAIL_BACK_BUFFER rationale). + if msg.cltv_expiry <= cur_height + HTLC_FAIL_BACK_BUFFER as u32 { // expiry_too_soon + break Some(("CLTV expiry is too close", 0x1000 | 14, chan_update_opt)); + } + if msg.cltv_expiry > cur_height + CLTV_FAR_FAR_AWAY as u32 { // expiry_too_far + break Some(("CLTV expiry is too far in the future", 21, None)); + } + // If the HTLC expires ~now, don't bother trying to forward it to our + // counterparty. They should fail it anyway, but we don't want to bother with + // the round-trips or risk them deciding they definitely want the HTLC and + // force-closing to ensure they get it if we're offline. + // We previously had a much more aggressive check here which tried to ensure + // our counterparty receives an HTLC which has *our* risk threshold met on it, + // but there is no need to do that, and since we're a bit conservative with our + // risk threshold it just results in failing to forward payments. + if (outgoing_cltv_value) as u64 <= (cur_height + LATENCY_GRACE_PERIOD_BLOCKS) as u64 { + break Some(("Outgoing CLTV value is too soon", 0x1000 | 14, chan_update_opt)); + } + + break None; + } + { + let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2)); + if let Some(chan_update) = chan_update { + if code == 0x1000 | 11 || code == 0x1000 | 12 { + msg.amount_msat.write(&mut res).expect("Writes cannot fail"); + } + else if code == 0x1000 | 13 { + msg.cltv_expiry.write(&mut res).expect("Writes cannot fail"); + } + else if code == 0x1000 | 20 { + // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791 + 0u16.write(&mut res).expect("Writes cannot fail"); + } + (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail"); + msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail"); + chan_update.write(&mut res).expect("Writes cannot fail"); + } else if code & 0x1000 == 0x1000 { + // If we're trying to return an error that requires a `channel_update` but + // we're forwarding to a phantom or intercept "channel" (i.e. cannot + // generate an update), just use the generic "temporary_node_failure" + // instead. + code = 0x2000 | 2; + } + return_err!(err, code, &res.0[..]); + } + Ok((next_hop, shared_secret, next_packet_pk_opt)) + } + + fn construct_pending_htlc_status<'a>( + &self, msg: &msgs::UpdateAddHTLC, shared_secret: [u8; 32], decoded_hop: onion_utils::Hop, + allow_underpay: bool, next_packet_pubkey_opt: Option> + ) -> PendingHTLCStatus { + macro_rules! return_err { + ($msg: expr, $err_code: expr, $data: expr) => { + { + log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg); + return PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + reason: HTLCFailReason::reason($err_code, $data.to_vec()) + .get_encrypted_failure_packet(&shared_secret, &None), + })); + } + } + } + match decoded_hop { onion_utils::Hop::Receive(next_hop_data) => { // OUR PAYMENT! - match self.construct_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash, msg.amount_msat, msg.cltv_expiry, None) { + match self.construct_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash, + msg.amount_msat, msg.cltv_expiry, None, allow_underpay, msg.skimmed_fee_msat) + { Ok(info) => { // Note that we could obviously respond immediately with an update_fulfill_htlc // message, however that would leak that we are the recipient of this payment, so @@ -2690,10 +2963,10 @@ where } }, onion_utils::Hop::Forward { next_hop_data, next_hop_hmac, new_packet_bytes } => { - let new_pubkey = msg.onion_routing_packet.public_key.unwrap(); + debug_assert!(next_packet_pubkey_opt.is_some()); let outgoing_packet = msgs::OnionPacket { version: 0, - public_key: onion_utils::next_hop_packet_pubkey(&self.secp_ctx, new_pubkey, &shared_secret), + public_key: next_packet_pubkey_opt.unwrap_or(Err(secp256k1::Error::InvalidPublicKey)), hop_data: new_packet_bytes, hmac: next_hop_hmac.clone(), }; @@ -2715,150 +2988,10 @@ where incoming_amt_msat: Some(msg.amount_msat), outgoing_amt_msat: next_hop_data.amt_to_forward, outgoing_cltv_value: next_hop_data.outgoing_cltv_value, + skimmed_fee_msat: None, }) } - }; - - if let &PendingHTLCStatus::Forward(PendingHTLCInfo { ref routing, ref outgoing_amt_msat, ref outgoing_cltv_value, .. }) = &pending_forward_info { - // If short_channel_id is 0 here, we'll reject the HTLC as there cannot be a channel - // with a short_channel_id of 0. This is important as various things later assume - // short_channel_id is non-0 in any ::Forward. - if let &PendingHTLCRouting::Forward { ref short_channel_id, .. } = routing { - if let Some((err, mut code, chan_update)) = loop { - let id_option = self.short_to_chan_info.read().unwrap().get(short_channel_id).cloned(); - let forwarding_chan_info_opt = match id_option { - None => { // unknown_next_peer - // Note that this is likely a timing oracle for detecting whether an scid is a - // phantom or an intercept. - if (self.default_configuration.accept_intercept_htlcs && - fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, *short_channel_id, &self.genesis_hash)) || - fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, *short_channel_id, &self.genesis_hash) - { - None - } else { - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - } - }, - Some((cp_id, id)) => Some((cp_id.clone(), id.clone())), - }; - let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { - let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if peer_state_mutex_opt.is_none() { - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let chan = match peer_state.channel_by_id.get_mut(&forwarding_id) { - None => { - // Channel was removed. The short_to_chan_info and channel_by_id maps - // have no consistency guarantees. - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - }, - Some(chan) => chan - }; - if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels { - // Note that the behavior here should be identical to the above block - we - // should NOT reveal the existence or non-existence of a private channel if - // we don't allow forwards outbound over them. - break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None)); - } - if chan.context.get_channel_type().supports_scid_privacy() && *short_channel_id != chan.context.outbound_scid_alias() { - // `option_scid_alias` (referred to in LDK as `scid_privacy`) means - // "refuse to forward unless the SCID alias was used", so we pretend - // we don't have the channel here. - break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None)); - } - let chan_update_opt = self.get_channel_update_for_onion(*short_channel_id, chan).ok(); - - // Note that we could technically not return an error yet here and just hope - // that the connection is reestablished or monitor updated by the time we get - // around to doing the actual forward, but better to fail early if we can and - // hopefully an attacker trying to path-trace payments cannot make this occur - // on a small/per-node/per-channel scale. - if !chan.context.is_live() { // channel_disabled - // If the channel_update we're going to return is disabled (i.e. the - // peer has been disabled for some time), return `channel_disabled`, - // otherwise return `temporary_channel_failure`. - if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { - break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); - } else { - break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); - } - } - if *outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum - break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); - } - if let Err((err, code)) = chan.htlc_satisfies_config(&msg, *outgoing_amt_msat, *outgoing_cltv_value) { - break Some((err, code, chan_update_opt)); - } - chan_update_opt - } else { - if (msg.cltv_expiry as u64) < (*outgoing_cltv_value) as u64 + MIN_CLTV_EXPIRY_DELTA as u64 { - // We really should set `incorrect_cltv_expiry` here but as we're not - // forwarding over a real channel we can't generate a channel_update - // for it. Instead we just return a generic temporary_node_failure. - break Some(( - "Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta", - 0x2000 | 2, None, - )); - } - None - }; - - let cur_height = self.best_block.read().unwrap().height() + 1; - // Theoretically, channel counterparty shouldn't send us a HTLC expiring now, - // but we want to be robust wrt to counterparty packet sanitization (see - // HTLC_FAIL_BACK_BUFFER rationale). - if msg.cltv_expiry <= cur_height + HTLC_FAIL_BACK_BUFFER as u32 { // expiry_too_soon - break Some(("CLTV expiry is too close", 0x1000 | 14, chan_update_opt)); - } - if msg.cltv_expiry > cur_height + CLTV_FAR_FAR_AWAY as u32 { // expiry_too_far - break Some(("CLTV expiry is too far in the future", 21, None)); - } - // If the HTLC expires ~now, don't bother trying to forward it to our - // counterparty. They should fail it anyway, but we don't want to bother with - // the round-trips or risk them deciding they definitely want the HTLC and - // force-closing to ensure they get it if we're offline. - // We previously had a much more aggressive check here which tried to ensure - // our counterparty receives an HTLC which has *our* risk threshold met on it, - // but there is no need to do that, and since we're a bit conservative with our - // risk threshold it just results in failing to forward payments. - if (*outgoing_cltv_value) as u64 <= (cur_height + LATENCY_GRACE_PERIOD_BLOCKS) as u64 { - break Some(("Outgoing CLTV value is too soon", 0x1000 | 14, chan_update_opt)); - } - - break None; - } - { - let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2)); - if let Some(chan_update) = chan_update { - if code == 0x1000 | 11 || code == 0x1000 | 12 { - msg.amount_msat.write(&mut res).expect("Writes cannot fail"); - } - else if code == 0x1000 | 13 { - msg.cltv_expiry.write(&mut res).expect("Writes cannot fail"); - } - else if code == 0x1000 | 20 { - // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791 - 0u16.write(&mut res).expect("Writes cannot fail"); - } - (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail"); - msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail"); - chan_update.write(&mut res).expect("Writes cannot fail"); - } else if code & 0x1000 == 0x1000 { - // If we're trying to return an error that requires a `channel_update` but - // we're forwarding to a phantom or intercept "channel" (i.e. cannot - // generate an update), just use the generic "temporary_node_failure" - // instead. - code = 0x2000 | 2; - } - return_err!(err, code, &res.0[..]); - } - } } - - pending_forward_info } /// Gets the current [`channel_update`] for the given channel. This first checks if the channel is @@ -2984,22 +3117,21 @@ where session_priv: session_priv.clone(), first_hop_htlc_msat: htlc_msat, payment_id, - }, onion_packet, &self.logger); + }, onion_packet, None, &self.fee_estimator, &self.logger); match break_chan_entry!(self, send_res, chan) { Some(monitor_update) => { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update); - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) { - break Err(e); - } - if update_res == ChannelMonitorUpdateStatus::InProgress { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); + match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) { + Err(e) => break Err(e), + Ok(false) => { + // Note that MonitorUpdateInProgress here indicates (per function + // docs) that we will resend the commitment update once monitor + // updating completes. Therefore, we must return an error + // indicating that it is unsafe to retry the payment wholesale, + // which we do in the send_payment check for + // MonitorUpdateInProgress, below. + return Err(APIError::MonitorUpdateInProgress); + }, + Ok(true) => {}, } }, None => { }, @@ -3451,13 +3583,16 @@ where /// [`ChannelManager::fail_intercepted_htlc`] MUST be called in response to the event. /// /// Note that LDK does not enforce fee requirements in `amt_to_forward_msat`, and will not stop - /// you from forwarding more than you received. + /// you from forwarding more than you received. See + /// [`HTLCIntercepted::expected_outbound_amount_msat`] for more on forwarding a different amount + /// than expected. /// /// Errors if the event was not handled in time, in which case the HTLC was automatically failed /// backwards. /// /// [`UserConfig::accept_intercept_htlcs`]: crate::util::config::UserConfig::accept_intercept_htlcs /// [`HTLCIntercepted`]: events::Event::HTLCIntercepted + /// [`HTLCIntercepted::expected_outbound_amount_msat`]: events::Event::HTLCIntercepted::expected_outbound_amount_msat // TODO: when we move to deciding the best outbound channel at forward time, only take // `next_node_id` and not `next_hop_channel_id` pub fn forward_intercepted_htlc(&self, intercept_id: InterceptId, next_hop_channel_id: &[u8; 32], next_node_id: PublicKey, amt_to_forward_msat: u64) -> Result<(), APIError> { @@ -3496,7 +3631,10 @@ where }, _ => unreachable!() // Only `PendingHTLCRouting::Forward`s are intercepted }; + let skimmed_fee_msat = + payment.forward_info.outgoing_amt_msat.saturating_sub(amt_to_forward_msat); let pending_htlc_info = PendingHTLCInfo { + skimmed_fee_msat: if skimmed_fee_msat == 0 { None } else { Some(skimmed_fee_msat) }, outgoing_amt_msat: amt_to_forward_msat, routing, ..payment.forward_info }; @@ -3566,7 +3704,7 @@ where prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id, forward_info: PendingHTLCInfo { routing, incoming_shared_secret, payment_hash, outgoing_amt_msat, - outgoing_cltv_value, incoming_amt_msat: _ + outgoing_cltv_value, .. } }) => { macro_rules! failure_handler { @@ -3628,7 +3766,10 @@ where }; match next_hop { onion_utils::Hop::Receive(hop_data) => { - match self.construct_recv_pending_htlc_info(hop_data, incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, Some(phantom_shared_secret)) { + match self.construct_recv_pending_htlc_info(hop_data, + incoming_shared_secret, payment_hash, outgoing_amt_msat, + outgoing_cltv_value, Some(phantom_shared_secret), false, None) + { Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, vec![(info, prev_htlc_id)])), Err(ReceiveError { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret)) } @@ -3679,7 +3820,7 @@ where prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id: _, forward_info: PendingHTLCInfo { incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, - routing: PendingHTLCRouting::Forward { onion_packet, .. }, incoming_amt_msat: _, + routing: PendingHTLCRouting::Forward { onion_packet, .. }, skimmed_fee_msat, .. }, }) => { log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id); @@ -3693,7 +3834,8 @@ where }); if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), - onion_packet, &self.logger) + onion_packet, skimmed_fee_msat, &self.fee_estimator, + &self.logger) { if let ChannelError::Ignore(msg) = e { log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg); @@ -3737,7 +3879,8 @@ where HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id, forward_info: PendingHTLCInfo { - routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, .. + routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, + skimmed_fee_msat, .. } }) => { let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret, mut onion_fields) = match routing { @@ -3778,6 +3921,7 @@ where total_msat: if let Some(data) = &payment_data { data.total_msat } else { outgoing_amt_msat }, cltv_expiry, onion_payload, + counterparty_skimmed_fee_msat: skimmed_fee_msat, }; let mut committed_to_claimable = false; @@ -3874,11 +4018,16 @@ where htlcs.push(claimable_htlc); let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum(); htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat)); + let counterparty_skimmed_fee_msat = htlcs.iter() + .map(|htlc| htlc.counterparty_skimmed_fee_msat.unwrap_or(0)).sum(); + debug_assert!(total_value.saturating_sub(amount_msat) <= + counterparty_skimmed_fee_msat); new_events.push_back((events::Event::PaymentClaimable { receiver_node_id: Some(receiver_node_id), payment_hash, purpose: $purpose, amount_msat, + counterparty_skimmed_fee_msat, via_channel_id: Some(prev_channel_id), via_user_channel_id: Some(prev_user_channel_id), claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER), @@ -4018,8 +4167,7 @@ where let _ = self.chain_monitor.update_channel(funding_txo, &update); }, BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => { - let update_res = self.chain_monitor.update_channel(funding_txo, &update); - + let mut updated_chan = false; let res = { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { @@ -4027,12 +4175,18 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { hash_map::Entry::Occupied(mut chan) => { - handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan) + updated_chan = true; + handle_new_monitor_update!(self, funding_txo, update.clone(), + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) }, hash_map::Entry::Vacant(_) => Ok(()), } } else { Ok(()) } }; + if !updated_chan { + // TODO: Track this as in-flight even though the channel is closed. + let _ = self.chain_monitor.update_channel(funding_txo, &update); + } // TODO: If this channel has since closed, we're likely providing a payment // preimage update, which we must ensure is durable! We currently don't, // however, ensure that. @@ -4070,7 +4224,7 @@ where log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.", log_bytes!(chan_id[..]), chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - chan.queue_update_fee(new_feerate, &self.logger); + chan.queue_update_fee(new_feerate, &self.fee_estimator, &self.logger); NotifyOption::DoPersist } @@ -4613,9 +4767,7 @@ where log_bytes!(chan_id), action); peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update); - let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, peer_state, per_peer_state, chan); if let Err(e) = res { // TODO: This is a *critical* error - we probably updated the outbound edge @@ -4823,12 +4975,18 @@ where hash_map::Entry::Vacant(_) => return, } }; - log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}", - highest_applied_update_id, channel.get().context.get_latest_monitor_update_id()); + let remaining_in_flight = + if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) { + pending.retain(|upd| upd.update_id > highest_applied_update_id); + pending.len() + } else { 0 }; + log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.", + highest_applied_update_id, channel.get().context.get_latest_monitor_update_id(), + remaining_in_flight); if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id { return; } - handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, per_peer_state, channel.get_mut()); + handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel.get_mut()); } /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`]. @@ -4886,7 +5044,7 @@ where let is_only_peer_channel = peer_state.total_channel_count() == 1; match peer_state.inbound_v1_channel_by_id.entry(temporary_channel_id.clone()) { hash_map::Entry::Occupied(mut channel) => { - if !channel.get().inbound_is_awaiting_accept() { + if !channel.get().is_awaiting_accept() { return Err(APIError::APIMisuseError { err: "The channel isn't currently awaiting to be accepted.".to_owned() }); } if accept_0conf { @@ -5039,9 +5197,13 @@ where return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision for the same peer!".to_owned(), msg.temporary_channel_id.clone())) } else { if !self.default_configuration.manually_accept_inbound_channels { - if channel.context.get_channel_type().requires_zero_conf() { + let channel_type = channel.context.get_channel_type(); + if channel_type.requires_zero_conf() { return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone())); } + if channel_type.requires_anchors_zero_fee_htlc_tx() { + return Err(MsgHandleErrInternal::send_err_msg_no_close("No channels with anchor outputs accepted".to_owned(), msg.temporary_channel_id.clone())); + } peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { node_id: counterparty_node_id.clone(), msg: channel.accept_inbound_channel(user_channel_id), @@ -5151,8 +5313,9 @@ where let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); let chan = e.insert(chan); - let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, - per_peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) }); + let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state, + per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, + { peer_state.channel_by_id.remove(&new_channel_id) }); // Note that we reply with the new channel_id in error messages if we gave up on the // channel, not the temporary_channel_id. This is compatible with ourselves, but the @@ -5164,7 +5327,7 @@ where if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { res.0 = None; } - res + res.map(|_| ()) } } } @@ -5185,7 +5348,7 @@ where let monitor = try_chan_entry!(self, chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); let update_res = self.chain_monitor.watch_channel(chan.get().context.get_funding_txo().unwrap(), monitor); - let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan); + let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { // We weren't able to watch the channel to begin with, so no updates should be made on // it. Previously, full_stack_target found an (unreachable) panic when the @@ -5194,7 +5357,7 @@ where shutdown_finish.0.take(); } } - res + res.map(|_| ()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -5282,9 +5445,8 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); } break Ok(()); }, @@ -5358,7 +5520,7 @@ where //encrypted with the same key. It's not immediately obvious how to usefully exploit that, //but we should prevent it anyway. - let pending_forward_info = self.decode_update_add_htlc_onion(msg); + let decoded_hop_res = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5370,6 +5532,12 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { + let pending_forward_info = match decoded_hop_res { + Ok((next_hop, shared_secret, next_packet_pk_opt)) => + self.construct_pending_htlc_status(msg, shared_secret, next_hop, + chan.get().context.config().accept_underpaying_htlcs, next_packet_pk_opt), + Err(e) => PendingHTLCStatus::Fail(e) + }; let create_pending_htlc_status = |chan: &Channel<::Signer>, pending_forward_info: PendingHTLCStatus, error_code: u16| { // If the update_add is completely bogus, the call will Err and we will close, // but if we've sent a shutdown and they haven't acknowledged it yet, we just @@ -5392,7 +5560,7 @@ where _ => pending_forward_info } }; - try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan); + try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5474,10 +5642,8 @@ where let funding_txo = chan.get().context.get_funding_txo(); let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan) + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, + peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5611,12 +5777,10 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().context.get_funding_txo(); - let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan); let res = if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, - peer_state_lock, peer_state, per_peer_state, chan) + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) }; (htlcs_to_fail, res) }, @@ -5884,18 +6048,15 @@ where let counterparty_node_id = chan.context.get_counterparty_node_id(); let funding_txo = chan.context.get_funding_txo(); let (monitor_opt, holding_cell_failed_htlcs) = - chan.maybe_free_holding_cell_htlcs(&self.logger); + chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &self.logger); if !holding_cell_failed_htlcs.is_empty() { failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id)); } if let Some(monitor_update) = monitor_opt { has_monitor_update = true; - let update_res = self.chain_monitor.update_channel( - funding_txo.expect("channel is live"), monitor_update); - let update_id = monitor_update.update_id; let channel_id: [u8; 32] = *channel_id; - let res = handle_new_monitor_update!(self, update_res, update_id, + let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING, peer_state.channel_by_id.remove(&channel_id)); if res.is_err() { @@ -6005,37 +6166,6 @@ where } } - fn set_payment_hash_secret_map(&self, payment_hash: PaymentHash, payment_preimage: Option, min_value_msat: Option, invoice_expiry_delta_secs: u32) -> Result { - assert!(invoice_expiry_delta_secs <= 60*60*24*365); // Sadly bitcoin timestamps are u32s, so panic before 2106 - - if min_value_msat.is_some() && min_value_msat.unwrap() > MAX_VALUE_MSAT { - return Err(APIError::APIMisuseError { err: format!("min_value_msat of {} greater than total 21 million bitcoin supply", min_value_msat.unwrap()) }); - } - - let payment_secret = PaymentSecret(self.entropy_source.get_secure_random_bytes()); - - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let mut payment_secrets = self.pending_inbound_payments.lock().unwrap(); - match payment_secrets.entry(payment_hash) { - hash_map::Entry::Vacant(e) => { - e.insert(PendingInboundPayment { - payment_secret, min_value_msat, payment_preimage, - user_payment_id: 0, // For compatibility with version 0.0.103 and earlier - // We assume that highest_seen_timestamp is pretty close to the current time - - // it's updated when we receive a new block with the maximum time we've seen in - // a header. It should never be more than two hours in the future. - // Thus, we add two hours here as a buffer to ensure we absolutely - // never fail a payment too early. - // Note that we assume that received blocks have reasonably up-to-date - // timestamps. - expiry_time: self.highest_seen_timestamp.load(Ordering::Acquire) as u64 + invoice_expiry_delta_secs as u64 + 7200, - }); - }, - hash_map::Entry::Occupied(_) => return Err(APIError::APIMisuseError { err: "Duplicate payment hash".to_owned() }), - } - Ok(payment_secret) - } - /// Gets a payment secret and payment hash for use in an invoice given to a third party wishing /// to pay us. /// @@ -6075,23 +6205,6 @@ where min_final_cltv_expiry_delta) } - /// Legacy version of [`create_inbound_payment`]. Use this method if you wish to share - /// serialized state with LDK node(s) running 0.0.103 and earlier. - /// - /// May panic if `invoice_expiry_delta_secs` is greater than one year. - /// - /// # Note - /// This method is deprecated and will be removed soon. - /// - /// [`create_inbound_payment`]: Self::create_inbound_payment - #[deprecated] - pub fn create_inbound_payment_legacy(&self, min_value_msat: Option, invoice_expiry_delta_secs: u32) -> Result<(PaymentHash, PaymentSecret), APIError> { - let payment_preimage = PaymentPreimage(self.entropy_source.get_secure_random_bytes()); - let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let payment_secret = self.set_payment_hash_secret_map(payment_hash, Some(payment_preimage), min_value_msat, invoice_expiry_delta_secs)?; - Ok((payment_hash, payment_secret)) - } - /// Gets a [`PaymentSecret`] for a given [`PaymentHash`], for which the payment preimage is /// stored external to LDK. /// @@ -6145,20 +6258,6 @@ where min_final_cltv_expiry) } - /// Legacy version of [`create_inbound_payment_for_hash`]. Use this method if you wish to share - /// serialized state with LDK node(s) running 0.0.103 and earlier. - /// - /// May panic if `invoice_expiry_delta_secs` is greater than one year. - /// - /// # Note - /// This method is deprecated and will be removed soon. - /// - /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash - #[deprecated] - pub fn create_inbound_payment_for_hash_legacy(&self, payment_hash: PaymentHash, min_value_msat: Option, invoice_expiry_delta_secs: u32) -> Result { - self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs) - } - /// Gets an LDK-generated payment preimage from a payment hash and payment secret that were /// previously returned from [`create_inbound_payment`]. /// @@ -6233,7 +6332,7 @@ where inflight_htlcs } - #[cfg(any(test, fuzzing, feature = "_test_utils"))] + #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { let events = core::cell::RefCell::new(Vec::new()); let event_handler = |event: events::Event| events.borrow_mut().push(event); @@ -6299,9 +6398,7 @@ where if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); - let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update); - let update_id = monitor_update.update_id; - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, + if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, peer_state_lck, peer_state, per_peer_state, chan) { errors.push((e, counterparty_node_id)); @@ -7022,6 +7119,7 @@ where inbound_v1_channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: true, @@ -7224,7 +7322,7 @@ pub(crate) fn provided_channel_type_features(config: &UserConfig) -> ChannelType /// Fetches the set of [`InitFeatures`] flags which are provided by or required by /// [`ChannelManager`]. -pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { +pub fn provided_init_features(config: &UserConfig) -> InitFeatures { // Note that if new features are added here which other peers may (eventually) require, we // should also add the corresponding (optional) bit to the [`ChannelMessageHandler`] impl for // [`ErroringMessageHandler`]. @@ -7240,11 +7338,8 @@ pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { features.set_channel_type_optional(); features.set_scid_privacy_optional(); features.set_zero_conf_optional(); - #[cfg(anchors)] - { // Attributes are not allowed on if expressions on our current MSRV of 1.41. - if _config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { - features.set_anchors_zero_fee_htlc_tx_optional(); - } + if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { + features.set_anchors_zero_fee_htlc_tx_optional(); } features } @@ -7301,6 +7396,7 @@ impl Writeable for ChannelDetails { (35, self.inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, self.feerate_sat_per_1000_weight, option), + (41, self.channel_shutdown_state, option), }); Ok(()) } @@ -7338,6 +7434,7 @@ impl Readable for ChannelDetails { (35, inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, feerate_sat_per_1000_weight, option), + (41, channel_shutdown_state, option), }); // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with @@ -7373,6 +7470,7 @@ impl Readable for ChannelDetails { inbound_htlc_minimum_msat, inbound_htlc_maximum_msat, feerate_sat_per_1000_weight, + channel_shutdown_state, }) } } @@ -7409,6 +7507,7 @@ impl_writeable_tlv_based!(PendingHTLCInfo, { (6, outgoing_amt_msat, required), (8, outgoing_cltv_value, required), (9, incoming_amt_msat, option), + (10, skimmed_fee_msat, option), }); @@ -7507,6 +7606,7 @@ impl Writeable for ClaimableHTLC { (5, self.total_value_received, option), (6, self.cltv_expiry, required), (8, keysend_preimage, option), + (10, self.counterparty_skimmed_fee_msat, option), }); Ok(()) } @@ -7514,24 +7614,19 @@ impl Writeable for ClaimableHTLC { impl Readable for ClaimableHTLC { fn read(reader: &mut R) -> Result { - let mut prev_hop = crate::util::ser::RequiredWrapper(None); - let mut value = 0; - let mut sender_intended_value = None; - let mut payment_data: Option = None; - let mut cltv_expiry = 0; - let mut total_value_received = None; - let mut total_msat = None; - let mut keysend_preimage: Option = None; - read_tlv_fields!(reader, { + _init_and_read_tlv_fields!(reader, { (0, prev_hop, required), (1, total_msat, option), - (2, value, required), + (2, value_ser, required), (3, sender_intended_value, option), - (4, payment_data, option), + (4, payment_data_opt, option), (5, total_value_received, option), (6, cltv_expiry, required), - (8, keysend_preimage, option) + (8, keysend_preimage, option), + (10, counterparty_skimmed_fee_msat, option), }); + let payment_data: Option = payment_data_opt; + let value = value_ser.0.unwrap(); let onion_payload = match keysend_preimage { Some(p) => { if payment_data.is_some() { @@ -7560,7 +7655,8 @@ impl Readable for ClaimableHTLC { total_value_received, total_msat: total_msat.unwrap(), onion_payload, - cltv_expiry, + cltv_expiry: cltv_expiry.0.unwrap(), + counterparty_skimmed_fee_msat, }) } } @@ -7855,6 +7951,16 @@ where pending_claiming_payments = None; } + let mut in_flight_monitor_updates: Option>> = None; + for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) { + for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() { + if !updates.is_empty() { + if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(HashMap::new()); } + in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates); + } + } + } + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -7865,6 +7971,7 @@ where (7, self.fake_scid_rand_bytes, required), (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option), (9, htlc_purposes, vec_type), + (10, in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), }); @@ -7914,6 +8021,14 @@ impl Readable for VecDeque<(Event, Option)> { } } +impl_writeable_tlv_based_enum!(ChannelShutdownState, + (0, NotShuttingDown) => {}, + (2, ShutdownInitiated) => {}, + (4, ResolvingHTLCs) => {}, + (6, NegotiatingClosingFee) => {}, + (8, ShutdownComplete) => {}, ; +); + /// Arguments for the creation of a ChannelManager that are not deserialized. /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation @@ -8081,7 +8196,7 @@ where let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut channel_closures = VecDeque::new(); - let mut pending_background_events = Vec::new(); + let mut close_background_events = Vec::new(); for _ in 0..channel_count { let mut channel: Channel<::Signer> = Channel::read(reader, ( &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config) @@ -8089,17 +8204,7 @@ where let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { - if channel.get_latest_complete_monitor_update_id() > monitor.get_latest_update_id() { - // If the channel is ahead of the monitor, return InvalidValue: - log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); - log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", - log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.get_latest_complete_monitor_update_id()); - log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); - log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); - log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); - log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); - return Err(DecodeError::InvalidValue); - } else if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || + if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() || channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || channel.context.get_latest_monitor_update_id() < monitor.get_latest_update_id() { @@ -8110,7 +8215,7 @@ where log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id()); let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true); if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { - pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update }); } @@ -8143,7 +8248,6 @@ where log_info!(args.logger, "Successfully loaded channel {} at update_id {} against monitor at update id {}", log_bytes!(channel.context.channel_id()), channel.context.get_latest_monitor_update_id(), monitor.get_latest_update_id()); - channel.complete_all_mon_updates_through(monitor.get_latest_update_id()); if let Some(short_channel_id) = channel.context.get_short_channel_id() { short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id())); } @@ -8190,7 +8294,7 @@ where update_id: CLOSED_CHANNEL_UPDATE_ID, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }], }; - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); + close_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); } } @@ -8219,20 +8323,27 @@ where claimable_htlcs_list.push((payment_hash, previous_hops)); } - let peer_count: u64 = Readable::read(reader)?; - let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); - for _ in 0..peer_count { - let peer_pubkey = Readable::read(reader)?; - let peer_state = PeerState { - channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), + let peer_state_from_chans = |channel_by_id| { + PeerState { + channel_by_id, outbound_v1_channel_by_id: HashMap::new(), inbound_v1_channel_by_id: HashMap::new(), - latest_features: Readable::read(reader)?, + latest_features: InitFeatures::empty(), pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: false, - }; + } + }; + + let peer_count: u64 = Readable::read(reader)?; + let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); + for _ in 0..peer_count { + let peer_pubkey = Readable::read(reader)?; + let peer_chans = peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()); + let mut peer_state = peer_state_from_chans(peer_chans); + peer_state.latest_features = Readable::read(reader)?; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -8260,24 +8371,6 @@ where } } - for (node_id, peer_mtx) in per_peer_state.iter() { - let peer_state = peer_mtx.lock().unwrap(); - for (_, chan) in peer_state.channel_by_id.iter() { - for update in chan.uncompleted_unblocked_mon_updates() { - if let Some(funding_txo) = chan.context.get_funding_txo() { - log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}", - update.update_id, log_bytes!(funding_txo.to_channel_id())); - pending_background_events.push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id: *node_id, funding_txo, update: update.clone(), - }); - } else { - return Err(DecodeError::InvalidValue); - } - } - } - } - let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111 let highest_seen_timestamp: u32 = Readable::read(reader)?; @@ -8314,6 +8407,7 @@ where let mut pending_claiming_payments = Some(HashMap::new()); let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; + let mut in_flight_monitor_updates: Option>> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -8324,6 +8418,7 @@ where (7, fake_scid_rand_bytes, option), (8, events_override, option), (9, claimable_htlc_purposes, vec_type), + (10, in_flight_monitor_updates, option), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), }); @@ -8357,6 +8452,103 @@ where retry_lock: Mutex::new(()) }; + // We have to replay (or skip, if they were completed after we wrote the `ChannelManager`) + // each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to + // check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we + // replayed, and for each monitor update we have to replay we have to ensure there's a + // `ChannelMonitor` for it. + // + // In order to do so we first walk all of our live channels (so that we can check their + // state immediately after doing the update replays, when we have the `update_id`s + // available) and then walk any remaining in-flight updates. + // + // Because the actual handling of the in-flight updates is the same, it's macro'ized here: + let mut pending_background_events = Vec::new(); + macro_rules! handle_in_flight_updates { + ($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr, + $monitor: expr, $peer_state: expr, $channel_info_log: expr + ) => { { + let mut max_in_flight_update_id = 0; + $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id()); + for update in $chan_in_flight_upds.iter() { + log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for {}channel {}", + update.update_id, $channel_info_log, log_bytes!($funding_txo.to_channel_id())); + max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id); + pending_background_events.push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: $counterparty_node_id, + funding_txo: $funding_txo, + update: update.clone(), + }); + } + if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() { + log_error!(args.logger, "Duplicate in-flight monitor update set for the same channel!"); + return Err(DecodeError::InvalidValue); + } + max_in_flight_update_id + } } + } + + for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() { + let mut peer_state_lock = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (_, chan) in peer_state.channel_by_id.iter() { + // Channels that were persisted have to be funded, otherwise they should have been + // discarded. + let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; + let monitor = args.channel_monitors.get(&funding_txo) + .expect("We already checked for monitor presence when loading channels"); + let mut max_in_flight_update_id = monitor.get_latest_update_id(); + if let Some(in_flight_upds) = &mut in_flight_monitor_updates { + if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) { + max_in_flight_update_id = cmp::max(max_in_flight_update_id, + handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds, + funding_txo, monitor, peer_state, "")); + } + } + if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id { + // If the channel is ahead of the monitor, return InvalidValue: + log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", + log_bytes!(chan.context.channel_id()), monitor.get_latest_update_id(), max_in_flight_update_id); + log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id()); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + if let Some(in_flight_upds) = in_flight_monitor_updates { + for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds { + if let Some(monitor) = args.channel_monitors.get(&funding_txo) { + // Now that we've removed all the in-flight monitor updates for channels that are + // still open, we need to replay any monitor updates that are for closed channels, + // creating the neccessary peer_state entries as we go. + let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| { + Mutex::new(peer_state_from_chans(HashMap::new())) + }); + let mut peer_state = peer_state_mutex.lock().unwrap(); + handle_in_flight_updates!(counterparty_id, chan_in_flight_updates, + funding_txo, monitor, peer_state, "closed "); + } else { + log_error!(args.logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is missing.", + log_bytes!(funding_txo.to_channel_id())); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + // Note that we have to do the above replays before we push new monitor updates. + pending_background_events.append(&mut close_background_events); + { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state @@ -8729,7 +8921,7 @@ mod tests { use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, RecipientOnionFields, InterceptId}; use crate::ln::functional_test_utils::*; - use crate::ln::msgs; + use crate::ln::msgs::{self, ErrorAction}; use crate::ln::msgs::ChannelMessageHandler; use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; @@ -9681,7 +9873,94 @@ mod tests { get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk); } - #[cfg(anchors)] + #[test] + fn reject_excessively_underpaying_htlcs() { + let chanmon_cfg = create_chanmon_cfgs(1); + let node_cfg = create_node_cfgs(1, &chanmon_cfg); + let node_chanmgr = create_node_chanmgrs(1, &node_cfg, &[None]); + let node = create_network(1, &node_cfg, &node_chanmgr); + let sender_intended_amt_msat = 100; + let extra_fee_msat = 10; + let hop_data = msgs::OnionHopData { + amt_to_forward: 100, + outgoing_cltv_value: 42, + format: msgs::OnionHopDataFormat::FinalNode { + keysend_preimage: None, + payment_metadata: None, + payment_data: Some(msgs::FinalOnionHopData { + payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat, + }), + } + }; + // Check that if the amount we received + the penultimate hop extra fee is less than the sender + // intended amount, we fail the payment. + if let Err(crate::ln::channelmanager::ReceiveError { err_code, .. }) = + node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), + sender_intended_amt_msat - extra_fee_msat - 1, 42, None, true, Some(extra_fee_msat)) + { + assert_eq!(err_code, 19); + } else { panic!(); } + + // If amt_received + extra_fee is equal to the sender intended amount, we're fine. + let hop_data = msgs::OnionHopData { // This is the same hop_data as above, OnionHopData doesn't implement Clone + amt_to_forward: 100, + outgoing_cltv_value: 42, + format: msgs::OnionHopDataFormat::FinalNode { + keysend_preimage: None, + payment_metadata: None, + payment_data: Some(msgs::FinalOnionHopData { + payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat, + }), + } + }; + assert!(node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), + sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat)).is_ok()); + } + + #[test] + fn test_inbound_anchors_manual_acceptance() { + // Tests that we properly limit inbound channels when we have the manual-channel-acceptance + // flag set and (sometimes) accept channels as 0conf. + let mut anchors_cfg = test_default_channel_config(); + anchors_cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; + + let mut anchors_manual_accept_cfg = anchors_cfg.clone(); + anchors_manual_accept_cfg.manually_accept_inbound_channels = true; + + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, + &[Some(anchors_cfg.clone()), Some(anchors_cfg.clone()), Some(anchors_manual_accept_cfg.clone())]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap(); + let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); + + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + match &msg_events[0] { + MessageSendEvent::HandleError { node_id, action } => { + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + match action { + ErrorAction::SendErrorMessage { msg } => + assert_eq!(msg.data, "No channels with anchor outputs accepted".to_owned()), + _ => panic!("Unexpected error action"), + } + } + _ => panic!("Unexpected event"), + } + + nodes[2].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + let events = nodes[2].node.get_and_clear_pending_events(); + match events[0] { + Event::OpenChannelRequest { temporary_channel_id, .. } => + nodes[2].node.accept_inbound_channel(&temporary_channel_id, &nodes[0].node.get_our_node_id(), 23).unwrap(), + _ => panic!("Unexpected event"), + } + get_event_msg!(nodes[2], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); + } + #[test] fn test_anchors_zero_fee_htlc_tx_fallback() { // Tests that if both nodes support anchors, but the remote node does not want to accept @@ -9786,7 +10065,7 @@ pub mod bench { use crate::routing::gossip::NetworkGraph; use crate::routing::router::{PaymentParameters, RouteParameters}; use crate::util::test_utils; - use crate::util::config::UserConfig; + use crate::util::config::{UserConfig, MaxDustHTLCExposure}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; @@ -9824,6 +10103,7 @@ pub mod bench { // Note that this is unrealistic as each payment send will require at least two fsync // calls per node. let network = bitcoin::Network::Testnet; + let genesis_block = bitcoin::blockdata::constants::genesis_block(network); let tx_broadcaster = test_utils::TestBroadcaster::new(network); let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; @@ -9832,6 +10112,7 @@ pub mod bench { let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer); let mut config: UserConfig = Default::default(); + config.channel_config.max_dust_htlc_exposure = MaxDustHTLCExposure::FeeRateMultiplier(5_000_000 / 253); config.channel_handshake_config.minimum_depth = 1; let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a); @@ -9840,7 +10121,7 @@ pub mod bench { let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), - }); + }, genesis_block.header.time); let node_a_holder = ANodeHolder { node: &node_a }; let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); @@ -9850,7 +10131,7 @@ pub mod bench { let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), - }); + }, genesis_block.header.time); let node_b_holder = ANodeHolder { node: &node_b }; node_a.peer_connected(&node_b.get_our_node_id(), &Init {