X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=1fa5bd4d519f3bc49e566c7f055b4b46f1a04c0f;hb=b66e3c53768f6bc7bc43064f1818051d22477c63;hp=808c7ffcd3ac2196e5961f406f13d5f347fee0c5;hpb=94853044fe94756de016ef76d6d34cb0ec7a34bd;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 808c7ffc..1fa5bd4d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -50,7 +50,7 @@ use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParame use crate::ln::msgs; use crate::ln::onion_utils; use crate::ln::onion_utils::HTLCFailReason; -use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VALUE_MSAT}; +use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; #[cfg(test)] use crate::ln::outbound_payment; use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment}; @@ -317,7 +317,7 @@ impl core::hash::Hash for HTLCSource { } } impl HTLCSource { - #[cfg(not(feature = "grind_signatures"))] + #[cfg(all(feature = "_test_vectors", not(feature = "grind_signatures")))] #[cfg(test)] pub fn dummy() -> Self { HTLCSource::OutboundRoute { @@ -633,6 +633,13 @@ pub(super) struct PeerState { /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the + /// user but which have not yet completed. + /// + /// Note that the channel may no longer exist. For example if the channel was closed but we + /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update + /// for a missing channel. + in_flight_monitor_updates: BTreeMap>, /// Map from a specific channel to some action(s) that should be taken when all pending /// [`ChannelMonitorUpdate`]s for the channel complete updating. /// @@ -668,6 +675,7 @@ impl PeerState { return false } self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty() + && self.in_flight_monitor_updates.is_empty() } // Returns a count of all channels we have with this peer, including pending channels. @@ -744,7 +752,23 @@ pub type SimpleArcChannelManager = ChannelManager< /// of [`KeysManager`] and [`DefaultRouter`]. /// /// This is not exported to bindings users as Arcs don't make sense in bindings -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = + ChannelManager< + &'a M, + &'b T, + &'c KeysManager, + &'c KeysManager, + &'c KeysManager, + &'d F, + &'e DefaultRouter< + &'f NetworkGraph<&'g L>, + &'g L, + &'h Mutex, &'g L>>, + ProbabilisticScoringFeeParameters, + ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L> + >, + &'g L + >; macro_rules! define_test_pub_trait { ($vis: vis) => { /// A trivial trait which describes any [`ChannelManager`] used in testing. @@ -1456,6 +1480,9 @@ pub struct ChannelDetails { /// /// [`confirmations_required`]: ChannelDetails::confirmations_required pub is_channel_ready: bool, + /// The stage of the channel's shutdown. + /// `None` for `ChannelDetails` serialized on LDK versions prior to 0.0.116. + pub channel_shutdown_state: Option, /// True if the channel is (a) confirmed and channel_ready messages have been exchanged, (b) /// the peer is connected, and (c) the channel is not currently negotiating a shutdown. /// @@ -1495,10 +1522,13 @@ impl ChannelDetails { self.short_channel_id.or(self.outbound_scid_alias) } - fn from_channel_context(context: &ChannelContext, - best_block_height: u32, latest_features: InitFeatures) -> Self { - - let balance = context.get_available_balances(); + fn from_channel_context( + context: &ChannelContext, best_block_height: u32, latest_features: InitFeatures, + fee_estimator: &LowerBoundedFeeEstimator + ) -> Self + where F::Target: FeeEstimator + { + let balance = context.get_available_balances(fee_estimator); let (to_remote_reserve_satoshis, to_self_reserve_satoshis) = context.get_holder_counterparty_selected_channel_reserve_satoshis(); ChannelDetails { @@ -1543,10 +1573,33 @@ impl ChannelDetails { inbound_htlc_minimum_msat: Some(context.get_holder_htlc_minimum_msat()), inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(), config: Some(context.config()), + channel_shutdown_state: Some(context.shutdown_state()), } } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Further information on the details of the channel shutdown. +/// Upon channels being forced closed (i.e. commitment transaction confirmation detected +/// by `ChainMonitor`), ChannelShutdownState will be set to `ShutdownComplete` or +/// the channel will be removed shortly. +/// Also note, that in normal operation, peers could disconnect at any of these states +/// and require peer re-connection before making progress onto other states +pub enum ChannelShutdownState { + /// Channel has not sent or received a shutdown message. + NotShuttingDown, + /// Local node has sent a shutdown message for this channel. + ShutdownInitiated, + /// Shutdown message exchanges have concluded and the channels are in the midst of + /// resolving all existing open HTLCs before closing can continue. + ResolvingHTLCs, + /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates. + NegotiatingClosingFee, + /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about + /// to drop the channel. + ShutdownComplete, +} + /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments. /// These include payments that have yet to find a successful path, or have unresolved HTLCs. #[derive(Debug, PartialEq)] @@ -1811,7 +1864,7 @@ macro_rules! emit_channel_ready_event { } macro_rules! handle_monitor_update_completion { - ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { + ($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { let mut updates = $chan.monitor_updating_restored(&$self.logger, &$self.node_signer, $self.genesis_hash, &$self.default_configuration, $self.best_block.read().unwrap().height()); @@ -1860,7 +1913,7 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); @@ -1871,13 +1924,13 @@ macro_rules! handle_new_monitor_update { ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", log_bytes!($chan.context.channel_id()[..])); - Ok(()) + Ok(false) }, ChannelMonitorUpdateStatus::PermanentFailure => { log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan.context.channel_id()[..])); update_maps_on_chan_removal!($self, &$chan.context); - let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown( + let res = Err(MsgHandleErrInternal::from_finish_shutdown( "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(), $chan.context.get_user_id(), $chan.context.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok())); @@ -1885,16 +1938,42 @@ macro_rules! handle_new_monitor_update { res }, ChannelMonitorUpdateStatus::Completed => { - $chan.complete_one_mon_update($update_id); - if $chan.no_monitor_updates_pending() { - handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); - } - Ok(()) + $completed; + Ok(true) }, } } }; - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { - handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) + }; + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry()) + }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) + .or_insert_with(Vec::new); + // During startup, we push monitor updates as background events through to here in + // order to replay updates that were in-flight when we shut down. Thus, we have to + // filter for uniqueness here. + let idx = in_flight_updates.iter().position(|upd| upd == &$update) + .unwrap_or_else(|| { + in_flight_updates.push($update); + in_flight_updates.len() - 1 + }); + let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]); + handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + { + let _ = in_flight_updates.remove(idx); + if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); + } + }) + } }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) } } @@ -1973,6 +2052,8 @@ where { /// Constructs a new `ChannelManager` to hold several channels and route between them. /// + /// The current time or latest block header time can be provided as the `current_timestamp`. + /// /// This is the main "logic hub" for all channel-related actions, and implements /// [`ChannelMessageHandler`]. /// @@ -1986,7 +2067,11 @@ where /// [`block_connected`]: chain::Listen::block_connected /// [`block_disconnected`]: chain::Listen::block_disconnected /// [`params.best_block.block_hash`]: chain::BestBlock::block_hash - pub fn new(fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters) -> Self { + pub fn new( + fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, + node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters, + current_timestamp: u32, + ) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); let inbound_pmt_key_material = node_signer.get_inbound_payment_key_material(); @@ -2018,7 +2103,7 @@ where probing_cookie_secret: entropy_source.get_secure_random_bytes(), - highest_seen_timestamp: AtomicUsize::new(0), + highest_seen_timestamp: AtomicUsize::new(current_timestamp as usize), per_peer_state: FairRwLock::new(HashMap::new()), @@ -2158,7 +2243,7 @@ where let peer_state = &mut *peer_state_lock; for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } } @@ -2184,17 +2269,17 @@ where let peer_state = &mut *peer_state_lock; for (_channel_id, channel) in peer_state.channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } } @@ -2227,7 +2312,8 @@ where return peer_state.channel_by_id .iter() .map(|(_, channel)| - ChannelDetails::from_channel_context(&channel.context, best_block_height, features.clone())) + ChannelDetails::from_channel_context(&channel.context, best_block_height, + features.clone(), &self.fee_estimator)) .collect(); } vec![] @@ -2309,9 +2395,8 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt.take() { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); } if chan_entry.get().is_shutdown() { @@ -2526,9 +2611,11 @@ where } } - fn construct_recv_pending_htlc_info(&self, hop_data: msgs::OnionHopData, shared_secret: [u8; 32], - payment_hash: PaymentHash, amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>) -> Result - { + fn construct_recv_pending_htlc_info( + &self, hop_data: msgs::OnionHopData, shared_secret: [u8; 32], payment_hash: PaymentHash, + amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>, allow_underpay: bool, + counterparty_skimmed_fee_msat: Option, + ) -> Result { // final_incorrect_cltv_expiry if hop_data.outgoing_cltv_value > cltv_expiry { return Err(ReceiveError { @@ -2554,7 +2641,10 @@ where msg: "The final CLTV expiry is too soon to handle", }); } - if hop_data.amt_to_forward > amt_msat { + if (!allow_underpay && hop_data.amt_to_forward > amt_msat) || + (allow_underpay && hop_data.amt_to_forward > + amt_msat.saturating_add(counterparty_skimmed_fee_msat.unwrap_or(0))) + { return Err(ReceiveError { err_code: 19, err_data: amt_msat.to_be_bytes().to_vec(), @@ -2621,7 +2711,7 @@ where incoming_amt_msat: Some(amt_msat), outgoing_amt_msat: hop_data.amt_to_forward, outgoing_cltv_value: hop_data.outgoing_cltv_value, - skimmed_fee_msat: None, + skimmed_fee_msat: counterparty_skimmed_fee_msat, }) } @@ -2841,7 +2931,7 @@ where fn construct_pending_htlc_status<'a>( &self, msg: &msgs::UpdateAddHTLC, shared_secret: [u8; 32], decoded_hop: onion_utils::Hop, - next_packet_pubkey_opt: Option> + allow_underpay: bool, next_packet_pubkey_opt: Option> ) -> PendingHTLCStatus { macro_rules! return_err { ($msg: expr, $err_code: expr, $data: expr) => { @@ -2859,7 +2949,9 @@ where match decoded_hop { onion_utils::Hop::Receive(next_hop_data) => { // OUR PAYMENT! - match self.construct_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash, msg.amount_msat, msg.cltv_expiry, None) { + match self.construct_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash, + msg.amount_msat, msg.cltv_expiry, None, allow_underpay, msg.skimmed_fee_msat) + { Ok(info) => { // Note that we could obviously respond immediately with an update_fulfill_htlc // message, however that would leak that we are the recipient of this payment, so @@ -3025,22 +3117,21 @@ where session_priv: session_priv.clone(), first_hop_htlc_msat: htlc_msat, payment_id, - }, onion_packet, &self.logger); + }, onion_packet, None, &self.fee_estimator, &self.logger); match break_chan_entry!(self, send_res, chan) { Some(monitor_update) => { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update); - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) { - break Err(e); - } - if update_res == ChannelMonitorUpdateStatus::InProgress { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); + match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) { + Err(e) => break Err(e), + Ok(false) => { + // Note that MonitorUpdateInProgress here indicates (per function + // docs) that we will resend the commitment update once monitor + // updating completes. Therefore, we must return an error + // indicating that it is unsafe to retry the payment wholesale, + // which we do in the send_payment check for + // MonitorUpdateInProgress, below. + return Err(APIError::MonitorUpdateInProgress); + }, + Ok(true) => {}, } }, None => { }, @@ -3675,7 +3766,10 @@ where }; match next_hop { onion_utils::Hop::Receive(hop_data) => { - match self.construct_recv_pending_htlc_info(hop_data, incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, Some(phantom_shared_secret)) { + match self.construct_recv_pending_htlc_info(hop_data, + incoming_shared_secret, payment_hash, outgoing_amt_msat, + outgoing_cltv_value, Some(phantom_shared_secret), false, None) + { Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, vec![(info, prev_htlc_id)])), Err(ReceiveError { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret)) } @@ -3726,7 +3820,7 @@ where prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id: _, forward_info: PendingHTLCInfo { incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, - routing: PendingHTLCRouting::Forward { onion_packet, .. }, .. + routing: PendingHTLCRouting::Forward { onion_packet, .. }, skimmed_fee_msat, .. }, }) => { log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id); @@ -3740,7 +3834,8 @@ where }); if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), - onion_packet, &self.logger) + onion_packet, skimmed_fee_msat, &self.fee_estimator, + &self.logger) { if let ChannelError::Ignore(msg) = e { log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg); @@ -3923,11 +4018,16 @@ where htlcs.push(claimable_htlc); let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum(); htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat)); + let counterparty_skimmed_fee_msat = htlcs.iter() + .map(|htlc| htlc.counterparty_skimmed_fee_msat.unwrap_or(0)).sum(); + debug_assert!(total_value.saturating_sub(amount_msat) <= + counterparty_skimmed_fee_msat); new_events.push_back((events::Event::PaymentClaimable { receiver_node_id: Some(receiver_node_id), payment_hash, purpose: $purpose, amount_msat, + counterparty_skimmed_fee_msat, via_channel_id: Some(prev_channel_id), via_user_channel_id: Some(prev_user_channel_id), claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER), @@ -4067,8 +4167,7 @@ where let _ = self.chain_monitor.update_channel(funding_txo, &update); }, BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => { - let update_res = self.chain_monitor.update_channel(funding_txo, &update); - + let mut updated_chan = false; let res = { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { @@ -4076,12 +4175,18 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { hash_map::Entry::Occupied(mut chan) => { - handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan) + updated_chan = true; + handle_new_monitor_update!(self, funding_txo, update.clone(), + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) }, hash_map::Entry::Vacant(_) => Ok(()), } } else { Ok(()) } }; + if !updated_chan { + // TODO: Track this as in-flight even though the channel is closed. + let _ = self.chain_monitor.update_channel(funding_txo, &update); + } // TODO: If this channel has since closed, we're likely providing a payment // preimage update, which we must ensure is durable! We currently don't, // however, ensure that. @@ -4119,7 +4224,7 @@ where log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.", log_bytes!(chan_id[..]), chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - chan.queue_update_fee(new_feerate, &self.logger); + chan.queue_update_fee(new_feerate, &self.fee_estimator, &self.logger); NotifyOption::DoPersist } @@ -4662,9 +4767,7 @@ where log_bytes!(chan_id), action); peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update); - let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, peer_state, per_peer_state, chan); if let Err(e) = res { // TODO: This is a *critical* error - we probably updated the outbound edge @@ -4872,12 +4975,18 @@ where hash_map::Entry::Vacant(_) => return, } }; - log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}", - highest_applied_update_id, channel.get().context.get_latest_monitor_update_id()); + let remaining_in_flight = + if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) { + pending.retain(|upd| upd.update_id > highest_applied_update_id); + pending.len() + } else { 0 }; + log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.", + highest_applied_update_id, channel.get().context.get_latest_monitor_update_id(), + remaining_in_flight); if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id { return; } - handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, per_peer_state, channel.get_mut()); + handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel.get_mut()); } /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`]. @@ -5088,9 +5197,13 @@ where return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision for the same peer!".to_owned(), msg.temporary_channel_id.clone())) } else { if !self.default_configuration.manually_accept_inbound_channels { - if channel.context.get_channel_type().requires_zero_conf() { + let channel_type = channel.context.get_channel_type(); + if channel_type.requires_zero_conf() { return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone())); } + if channel_type.requires_anchors_zero_fee_htlc_tx() { + return Err(MsgHandleErrInternal::send_err_msg_no_close("No channels with anchor outputs accepted".to_owned(), msg.temporary_channel_id.clone())); + } peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { node_id: counterparty_node_id.clone(), msg: channel.accept_inbound_channel(user_channel_id), @@ -5200,8 +5313,9 @@ where let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); let chan = e.insert(chan); - let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, - per_peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) }); + let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state, + per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, + { peer_state.channel_by_id.remove(&new_channel_id) }); // Note that we reply with the new channel_id in error messages if we gave up on the // channel, not the temporary_channel_id. This is compatible with ourselves, but the @@ -5213,7 +5327,7 @@ where if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { res.0 = None; } - res + res.map(|_| ()) } } } @@ -5234,7 +5348,7 @@ where let monitor = try_chan_entry!(self, chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); let update_res = self.chain_monitor.watch_channel(chan.get().context.get_funding_txo().unwrap(), monitor); - let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan); + let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { // We weren't able to watch the channel to begin with, so no updates should be made on // it. Previously, full_stack_target found an (unreachable) panic when the @@ -5243,7 +5357,7 @@ where shutdown_finish.0.take(); } } - res + res.map(|_| ()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -5331,9 +5445,8 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); } break Ok(()); }, @@ -5421,7 +5534,8 @@ where let pending_forward_info = match decoded_hop_res { Ok((next_hop, shared_secret, next_packet_pk_opt)) => - self.construct_pending_htlc_status(msg, shared_secret, next_hop, next_packet_pk_opt), + self.construct_pending_htlc_status(msg, shared_secret, next_hop, + chan.get().context.config().accept_underpaying_htlcs, next_packet_pk_opt), Err(e) => PendingHTLCStatus::Fail(e) }; let create_pending_htlc_status = |chan: &Channel<::Signer>, pending_forward_info: PendingHTLCStatus, error_code: u16| { @@ -5446,7 +5560,7 @@ where _ => pending_forward_info } }; - try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan); + try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5528,10 +5642,8 @@ where let funding_txo = chan.get().context.get_funding_txo(); let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan) + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, + peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5665,12 +5777,10 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().context.get_funding_txo(); - let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan); let res = if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, - peer_state_lock, peer_state, per_peer_state, chan) + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) }; (htlcs_to_fail, res) }, @@ -5938,18 +6048,15 @@ where let counterparty_node_id = chan.context.get_counterparty_node_id(); let funding_txo = chan.context.get_funding_txo(); let (monitor_opt, holding_cell_failed_htlcs) = - chan.maybe_free_holding_cell_htlcs(&self.logger); + chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &self.logger); if !holding_cell_failed_htlcs.is_empty() { failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id)); } if let Some(monitor_update) = monitor_opt { has_monitor_update = true; - let update_res = self.chain_monitor.update_channel( - funding_txo.expect("channel is live"), monitor_update); - let update_id = monitor_update.update_id; let channel_id: [u8; 32] = *channel_id; - let res = handle_new_monitor_update!(self, update_res, update_id, + let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING, peer_state.channel_by_id.remove(&channel_id)); if res.is_err() { @@ -6059,37 +6166,6 @@ where } } - fn set_payment_hash_secret_map(&self, payment_hash: PaymentHash, payment_preimage: Option, min_value_msat: Option, invoice_expiry_delta_secs: u32) -> Result { - assert!(invoice_expiry_delta_secs <= 60*60*24*365); // Sadly bitcoin timestamps are u32s, so panic before 2106 - - if min_value_msat.is_some() && min_value_msat.unwrap() > MAX_VALUE_MSAT { - return Err(APIError::APIMisuseError { err: format!("min_value_msat of {} greater than total 21 million bitcoin supply", min_value_msat.unwrap()) }); - } - - let payment_secret = PaymentSecret(self.entropy_source.get_secure_random_bytes()); - - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let mut payment_secrets = self.pending_inbound_payments.lock().unwrap(); - match payment_secrets.entry(payment_hash) { - hash_map::Entry::Vacant(e) => { - e.insert(PendingInboundPayment { - payment_secret, min_value_msat, payment_preimage, - user_payment_id: 0, // For compatibility with version 0.0.103 and earlier - // We assume that highest_seen_timestamp is pretty close to the current time - - // it's updated when we receive a new block with the maximum time we've seen in - // a header. It should never be more than two hours in the future. - // Thus, we add two hours here as a buffer to ensure we absolutely - // never fail a payment too early. - // Note that we assume that received blocks have reasonably up-to-date - // timestamps. - expiry_time: self.highest_seen_timestamp.load(Ordering::Acquire) as u64 + invoice_expiry_delta_secs as u64 + 7200, - }); - }, - hash_map::Entry::Occupied(_) => return Err(APIError::APIMisuseError { err: "Duplicate payment hash".to_owned() }), - } - Ok(payment_secret) - } - /// Gets a payment secret and payment hash for use in an invoice given to a third party wishing /// to pay us. /// @@ -6129,23 +6205,6 @@ where min_final_cltv_expiry_delta) } - /// Legacy version of [`create_inbound_payment`]. Use this method if you wish to share - /// serialized state with LDK node(s) running 0.0.103 and earlier. - /// - /// May panic if `invoice_expiry_delta_secs` is greater than one year. - /// - /// # Note - /// This method is deprecated and will be removed soon. - /// - /// [`create_inbound_payment`]: Self::create_inbound_payment - #[deprecated] - pub fn create_inbound_payment_legacy(&self, min_value_msat: Option, invoice_expiry_delta_secs: u32) -> Result<(PaymentHash, PaymentSecret), APIError> { - let payment_preimage = PaymentPreimage(self.entropy_source.get_secure_random_bytes()); - let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let payment_secret = self.set_payment_hash_secret_map(payment_hash, Some(payment_preimage), min_value_msat, invoice_expiry_delta_secs)?; - Ok((payment_hash, payment_secret)) - } - /// Gets a [`PaymentSecret`] for a given [`PaymentHash`], for which the payment preimage is /// stored external to LDK. /// @@ -6199,20 +6258,6 @@ where min_final_cltv_expiry) } - /// Legacy version of [`create_inbound_payment_for_hash`]. Use this method if you wish to share - /// serialized state with LDK node(s) running 0.0.103 and earlier. - /// - /// May panic if `invoice_expiry_delta_secs` is greater than one year. - /// - /// # Note - /// This method is deprecated and will be removed soon. - /// - /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash - #[deprecated] - pub fn create_inbound_payment_for_hash_legacy(&self, payment_hash: PaymentHash, min_value_msat: Option, invoice_expiry_delta_secs: u32) -> Result { - self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs) - } - /// Gets an LDK-generated payment preimage from a payment hash and payment secret that were /// previously returned from [`create_inbound_payment`]. /// @@ -6287,7 +6332,7 @@ where inflight_htlcs } - #[cfg(any(test, fuzzing, feature = "_test_utils"))] + #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { let events = core::cell::RefCell::new(Vec::new()); let event_handler = |event: events::Event| events.borrow_mut().push(event); @@ -6353,9 +6398,7 @@ where if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); - let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update); - let update_id = monitor_update.update_id; - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, + if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, peer_state_lck, peer_state, per_peer_state, chan) { errors.push((e, counterparty_node_id)); @@ -7076,6 +7119,7 @@ where inbound_v1_channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: true, @@ -7278,7 +7322,7 @@ pub(crate) fn provided_channel_type_features(config: &UserConfig) -> ChannelType /// Fetches the set of [`InitFeatures`] flags which are provided by or required by /// [`ChannelManager`]. -pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { +pub fn provided_init_features(config: &UserConfig) -> InitFeatures { // Note that if new features are added here which other peers may (eventually) require, we // should also add the corresponding (optional) bit to the [`ChannelMessageHandler`] impl for // [`ErroringMessageHandler`]. @@ -7294,11 +7338,8 @@ pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { features.set_channel_type_optional(); features.set_scid_privacy_optional(); features.set_zero_conf_optional(); - #[cfg(anchors)] - { // Attributes are not allowed on if expressions on our current MSRV of 1.41. - if _config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { - features.set_anchors_zero_fee_htlc_tx_optional(); - } + if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { + features.set_anchors_zero_fee_htlc_tx_optional(); } features } @@ -7355,6 +7396,7 @@ impl Writeable for ChannelDetails { (35, self.inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, self.feerate_sat_per_1000_weight, option), + (41, self.channel_shutdown_state, option), }); Ok(()) } @@ -7392,6 +7434,7 @@ impl Readable for ChannelDetails { (35, inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, feerate_sat_per_1000_weight, option), + (41, channel_shutdown_state, option), }); // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with @@ -7427,6 +7470,7 @@ impl Readable for ChannelDetails { inbound_htlc_minimum_msat, inbound_htlc_maximum_msat, feerate_sat_per_1000_weight, + channel_shutdown_state, }) } } @@ -7907,6 +7951,16 @@ where pending_claiming_payments = None; } + let mut in_flight_monitor_updates: Option>> = None; + for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) { + for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() { + if !updates.is_empty() { + if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(HashMap::new()); } + in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates); + } + } + } + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -7917,6 +7971,7 @@ where (7, self.fake_scid_rand_bytes, required), (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option), (9, htlc_purposes, vec_type), + (10, in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), }); @@ -7966,6 +8021,14 @@ impl Readable for VecDeque<(Event, Option)> { } } +impl_writeable_tlv_based_enum!(ChannelShutdownState, + (0, NotShuttingDown) => {}, + (2, ShutdownInitiated) => {}, + (4, ResolvingHTLCs) => {}, + (6, NegotiatingClosingFee) => {}, + (8, ShutdownComplete) => {}, ; +); + /// Arguments for the creation of a ChannelManager that are not deserialized. /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation @@ -8133,7 +8196,7 @@ where let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut channel_closures = VecDeque::new(); - let mut pending_background_events = Vec::new(); + let mut close_background_events = Vec::new(); for _ in 0..channel_count { let mut channel: Channel<::Signer> = Channel::read(reader, ( &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config) @@ -8141,17 +8204,7 @@ where let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { - if channel.get_latest_complete_monitor_update_id() > monitor.get_latest_update_id() { - // If the channel is ahead of the monitor, return InvalidValue: - log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); - log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", - log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.get_latest_complete_monitor_update_id()); - log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); - log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); - log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); - log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); - return Err(DecodeError::InvalidValue); - } else if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || + if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() || channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || channel.context.get_latest_monitor_update_id() < monitor.get_latest_update_id() { @@ -8162,7 +8215,7 @@ where log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id()); let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true); if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { - pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update }); } @@ -8195,7 +8248,6 @@ where log_info!(args.logger, "Successfully loaded channel {} at update_id {} against monitor at update id {}", log_bytes!(channel.context.channel_id()), channel.context.get_latest_monitor_update_id(), monitor.get_latest_update_id()); - channel.complete_all_mon_updates_through(monitor.get_latest_update_id()); if let Some(short_channel_id) = channel.context.get_short_channel_id() { short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id())); } @@ -8242,7 +8294,7 @@ where update_id: CLOSED_CHANNEL_UPDATE_ID, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }], }; - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); + close_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); } } @@ -8271,20 +8323,27 @@ where claimable_htlcs_list.push((payment_hash, previous_hops)); } - let peer_count: u64 = Readable::read(reader)?; - let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); - for _ in 0..peer_count { - let peer_pubkey = Readable::read(reader)?; - let peer_state = PeerState { - channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), + let peer_state_from_chans = |channel_by_id| { + PeerState { + channel_by_id, outbound_v1_channel_by_id: HashMap::new(), inbound_v1_channel_by_id: HashMap::new(), - latest_features: Readable::read(reader)?, + latest_features: InitFeatures::empty(), pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: false, - }; + } + }; + + let peer_count: u64 = Readable::read(reader)?; + let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); + for _ in 0..peer_count { + let peer_pubkey = Readable::read(reader)?; + let peer_chans = peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()); + let mut peer_state = peer_state_from_chans(peer_chans); + peer_state.latest_features = Readable::read(reader)?; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -8312,24 +8371,6 @@ where } } - for (node_id, peer_mtx) in per_peer_state.iter() { - let peer_state = peer_mtx.lock().unwrap(); - for (_, chan) in peer_state.channel_by_id.iter() { - for update in chan.uncompleted_unblocked_mon_updates() { - if let Some(funding_txo) = chan.context.get_funding_txo() { - log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}", - update.update_id, log_bytes!(funding_txo.to_channel_id())); - pending_background_events.push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id: *node_id, funding_txo, update: update.clone(), - }); - } else { - return Err(DecodeError::InvalidValue); - } - } - } - } - let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111 let highest_seen_timestamp: u32 = Readable::read(reader)?; @@ -8366,6 +8407,7 @@ where let mut pending_claiming_payments = Some(HashMap::new()); let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; + let mut in_flight_monitor_updates: Option>> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -8376,6 +8418,7 @@ where (7, fake_scid_rand_bytes, option), (8, events_override, option), (9, claimable_htlc_purposes, vec_type), + (10, in_flight_monitor_updates, option), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), }); @@ -8409,6 +8452,103 @@ where retry_lock: Mutex::new(()) }; + // We have to replay (or skip, if they were completed after we wrote the `ChannelManager`) + // each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to + // check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we + // replayed, and for each monitor update we have to replay we have to ensure there's a + // `ChannelMonitor` for it. + // + // In order to do so we first walk all of our live channels (so that we can check their + // state immediately after doing the update replays, when we have the `update_id`s + // available) and then walk any remaining in-flight updates. + // + // Because the actual handling of the in-flight updates is the same, it's macro'ized here: + let mut pending_background_events = Vec::new(); + macro_rules! handle_in_flight_updates { + ($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr, + $monitor: expr, $peer_state: expr, $channel_info_log: expr + ) => { { + let mut max_in_flight_update_id = 0; + $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id()); + for update in $chan_in_flight_upds.iter() { + log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for {}channel {}", + update.update_id, $channel_info_log, log_bytes!($funding_txo.to_channel_id())); + max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id); + pending_background_events.push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: $counterparty_node_id, + funding_txo: $funding_txo, + update: update.clone(), + }); + } + if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() { + log_error!(args.logger, "Duplicate in-flight monitor update set for the same channel!"); + return Err(DecodeError::InvalidValue); + } + max_in_flight_update_id + } } + } + + for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() { + let mut peer_state_lock = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (_, chan) in peer_state.channel_by_id.iter() { + // Channels that were persisted have to be funded, otherwise they should have been + // discarded. + let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; + let monitor = args.channel_monitors.get(&funding_txo) + .expect("We already checked for monitor presence when loading channels"); + let mut max_in_flight_update_id = monitor.get_latest_update_id(); + if let Some(in_flight_upds) = &mut in_flight_monitor_updates { + if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) { + max_in_flight_update_id = cmp::max(max_in_flight_update_id, + handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds, + funding_txo, monitor, peer_state, "")); + } + } + if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id { + // If the channel is ahead of the monitor, return InvalidValue: + log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", + log_bytes!(chan.context.channel_id()), monitor.get_latest_update_id(), max_in_flight_update_id); + log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id()); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + if let Some(in_flight_upds) = in_flight_monitor_updates { + for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds { + if let Some(monitor) = args.channel_monitors.get(&funding_txo) { + // Now that we've removed all the in-flight monitor updates for channels that are + // still open, we need to replay any monitor updates that are for closed channels, + // creating the neccessary peer_state entries as we go. + let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| { + Mutex::new(peer_state_from_chans(HashMap::new())) + }); + let mut peer_state = peer_state_mutex.lock().unwrap(); + handle_in_flight_updates!(counterparty_id, chan_in_flight_updates, + funding_txo, monitor, peer_state, "closed "); + } else { + log_error!(args.logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is missing.", + log_bytes!(funding_txo.to_channel_id())); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + // Note that we have to do the above replays before we push new monitor updates. + pending_background_events.append(&mut close_background_events); + { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state @@ -8781,7 +8921,7 @@ mod tests { use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, RecipientOnionFields, InterceptId}; use crate::ln::functional_test_utils::*; - use crate::ln::msgs; + use crate::ln::msgs::{self, ErrorAction}; use crate::ln::msgs::ChannelMessageHandler; use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; @@ -9733,7 +9873,94 @@ mod tests { get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk); } - #[cfg(anchors)] + #[test] + fn reject_excessively_underpaying_htlcs() { + let chanmon_cfg = create_chanmon_cfgs(1); + let node_cfg = create_node_cfgs(1, &chanmon_cfg); + let node_chanmgr = create_node_chanmgrs(1, &node_cfg, &[None]); + let node = create_network(1, &node_cfg, &node_chanmgr); + let sender_intended_amt_msat = 100; + let extra_fee_msat = 10; + let hop_data = msgs::OnionHopData { + amt_to_forward: 100, + outgoing_cltv_value: 42, + format: msgs::OnionHopDataFormat::FinalNode { + keysend_preimage: None, + payment_metadata: None, + payment_data: Some(msgs::FinalOnionHopData { + payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat, + }), + } + }; + // Check that if the amount we received + the penultimate hop extra fee is less than the sender + // intended amount, we fail the payment. + if let Err(crate::ln::channelmanager::ReceiveError { err_code, .. }) = + node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), + sender_intended_amt_msat - extra_fee_msat - 1, 42, None, true, Some(extra_fee_msat)) + { + assert_eq!(err_code, 19); + } else { panic!(); } + + // If amt_received + extra_fee is equal to the sender intended amount, we're fine. + let hop_data = msgs::OnionHopData { // This is the same hop_data as above, OnionHopData doesn't implement Clone + amt_to_forward: 100, + outgoing_cltv_value: 42, + format: msgs::OnionHopDataFormat::FinalNode { + keysend_preimage: None, + payment_metadata: None, + payment_data: Some(msgs::FinalOnionHopData { + payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat, + }), + } + }; + assert!(node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), + sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat)).is_ok()); + } + + #[test] + fn test_inbound_anchors_manual_acceptance() { + // Tests that we properly limit inbound channels when we have the manual-channel-acceptance + // flag set and (sometimes) accept channels as 0conf. + let mut anchors_cfg = test_default_channel_config(); + anchors_cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; + + let mut anchors_manual_accept_cfg = anchors_cfg.clone(); + anchors_manual_accept_cfg.manually_accept_inbound_channels = true; + + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, + &[Some(anchors_cfg.clone()), Some(anchors_cfg.clone()), Some(anchors_manual_accept_cfg.clone())]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap(); + let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); + + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + match &msg_events[0] { + MessageSendEvent::HandleError { node_id, action } => { + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + match action { + ErrorAction::SendErrorMessage { msg } => + assert_eq!(msg.data, "No channels with anchor outputs accepted".to_owned()), + _ => panic!("Unexpected error action"), + } + } + _ => panic!("Unexpected event"), + } + + nodes[2].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + let events = nodes[2].node.get_and_clear_pending_events(); + match events[0] { + Event::OpenChannelRequest { temporary_channel_id, .. } => + nodes[2].node.accept_inbound_channel(&temporary_channel_id, &nodes[0].node.get_our_node_id(), 23).unwrap(), + _ => panic!("Unexpected event"), + } + get_event_msg!(nodes[2], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); + } + #[test] fn test_anchors_zero_fee_htlc_tx_fallback() { // Tests that if both nodes support anchors, but the remote node does not want to accept @@ -9838,7 +10065,7 @@ pub mod bench { use crate::routing::gossip::NetworkGraph; use crate::routing::router::{PaymentParameters, RouteParameters}; use crate::util::test_utils; - use crate::util::config::UserConfig; + use crate::util::config::{UserConfig, MaxDustHTLCExposure}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; @@ -9876,6 +10103,7 @@ pub mod bench { // Note that this is unrealistic as each payment send will require at least two fsync // calls per node. let network = bitcoin::Network::Testnet; + let genesis_block = bitcoin::blockdata::constants::genesis_block(network); let tx_broadcaster = test_utils::TestBroadcaster::new(network); let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; @@ -9884,6 +10112,7 @@ pub mod bench { let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer); let mut config: UserConfig = Default::default(); + config.channel_config.max_dust_htlc_exposure = MaxDustHTLCExposure::FeeRateMultiplier(5_000_000 / 253); config.channel_handshake_config.minimum_depth = 1; let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a); @@ -9892,7 +10121,7 @@ pub mod bench { let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), - }); + }, genesis_block.header.time); let node_a_holder = ANodeHolder { node: &node_a }; let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); @@ -9902,7 +10131,7 @@ pub mod bench { let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), - }); + }, genesis_block.header.time); let node_b_holder = ANodeHolder { node: &node_b }; node_a.peer_connected(&node_b.get_our_node_id(), &Init {