X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=c7fc70a4593d66601b7d0d0e2acf8759196e5ffd;hb=c383f06538ac664fe3312daf765595ba106d5b98;hp=d427e35cc526cd7078cf6df9ee2cc69c42a34eef;hpb=25c1ad8e1918ee985f39b2df8714df96cad2eae6;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d427e35c..c7fc70a4 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -40,20 +40,20 @@ use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, Messa // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret}; -use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UpdateFulfillCommitFetch}; +use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UnfundedChannelContext, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel}; use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures}; #[cfg(any(feature = "_test_utils", test))] -use crate::ln::features::InvoiceFeatures; +use crate::ln::features::Bolt11InvoiceFeatures; use crate::routing::gossip::NetworkGraph; -use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router}; +use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteParameters, Router}; use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use crate::ln::msgs; use crate::ln::onion_utils; use crate::ln::onion_utils::HTLCFailReason; -use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VALUE_MSAT}; +use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; #[cfg(test)] use crate::ln::outbound_payment; -use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment}; +use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment, SendAlongPathArgs}; use crate::ln::wire::Encode; use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner, WriteableEcdsaChannelSigner}; use crate::util::config::{UserConfig, ChannelConfig, ChannelConfigUpdate}; @@ -131,6 +131,9 @@ pub(super) struct PendingHTLCInfo { /// may overshoot this in either case) pub(super) outgoing_amt_msat: u64, pub(super) outgoing_cltv_value: u32, + /// The fee being skimmed off the top of this HTLC. If this is a forward, it'll be the fee we are + /// skimming. If we're receiving this HTLC, it's the fee that our counterparty skimmed. + pub(super) skimmed_fee_msat: Option, } #[derive(Clone)] // See Channel::revoke_and_ack for why, tl;dr: Rust bug @@ -210,6 +213,8 @@ struct ClaimableHTLC { total_value_received: Option, /// The sender intended sum total of all MPP parts specified in the onion total_msat: u64, + /// The extra fee our counterparty skimmed off the top of this HTLC. + counterparty_skimmed_fee_msat: Option, } /// A payment identifier used to uniquely identify a payment to LDK. @@ -312,7 +317,7 @@ impl core::hash::Hash for HTLCSource { } } impl HTLCSource { - #[cfg(not(feature = "grind_signatures"))] + #[cfg(all(feature = "_test_vectors", not(feature = "grind_signatures")))] #[cfg(test)] pub fn dummy() -> Self { HTLCSource::OutboundRoute { @@ -502,19 +507,19 @@ struct ClaimablePayments { /// running normally, and specifically must be processed before any other non-background /// [`ChannelMonitorUpdate`]s are applied. enum BackgroundEvent { - /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from - /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public - /// key to handle channel resumption, whereas if the channel has been force-closed we do not - /// need the counterparty node_id. + /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel. + /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the + /// maybe-non-closing variant needs a public key to handle channel resumption, whereas if the + /// channel has been force-closed we do not need the counterparty node_id. /// /// Note that any such events are lost on shutdown, so in general they must be updates which /// are regenerated on startup. - ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), + ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the /// channel to continue normal operation. /// /// In general this should be used rather than - /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the + /// [`Self::ClosedMonitorUpdateRegeneratedOnStartup`], however in cases where the /// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`] /// error the other variant is acceptable. /// @@ -525,6 +530,13 @@ enum BackgroundEvent { funding_txo: OutPoint, update: ChannelMonitorUpdate }, + /// Some [`ChannelMonitorUpdate`] (s) completed before we were serialized but we still have + /// them marked pending, thus we need to run any [`MonitorUpdateCompletionAction`] (s) pending + /// on a channel. + MonitorUpdatesComplete { + counterparty_node_id: PublicKey, + channel_id: [u8; 32], + }, } #[derive(Debug)] @@ -607,17 +619,34 @@ impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction, /// State we hold per-peer. pub(super) struct PeerState { - /// `temporary_channel_id` or `channel_id` -> `channel`. + /// `channel_id` -> `Channel`. /// - /// Holds all channels where the peer is the counterparty. Once a channel has been assigned a - /// `channel_id`, the `temporary_channel_id` key in the map is updated and is replaced by the - /// `channel_id`. + /// Holds all funded channels where the peer is the counterparty. pub(super) channel_by_id: HashMap<[u8; 32], Channel>, + /// `temporary_channel_id` -> `OutboundV1Channel`. + /// + /// Holds all outbound V1 channels where the peer is the counterparty. Once an outbound channel has + /// been assigned a `channel_id`, the entry in this map is removed and one is created in + /// `channel_by_id`. + pub(super) outbound_v1_channel_by_id: HashMap<[u8; 32], OutboundV1Channel>, + /// `temporary_channel_id` -> `InboundV1Channel`. + /// + /// Holds all inbound V1 channels where the peer is the counterparty. Once an inbound channel has + /// been assigned a `channel_id`, the entry in this map is removed and one is created in + /// `channel_by_id`. + pub(super) inbound_v1_channel_by_id: HashMap<[u8; 32], InboundV1Channel>, /// The latest `InitFeatures` we heard from the peer. latest_features: InitFeatures, /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the + /// user but which have not yet completed. + /// + /// Note that the channel may no longer exist. For example if the channel was closed but we + /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update + /// for a missing channel. + in_flight_monitor_updates: BTreeMap>, /// Map from a specific channel to some action(s) that should be taken when all pending /// [`ChannelMonitorUpdate`]s for the channel complete updating. /// @@ -653,6 +682,21 @@ impl PeerState { return false } self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty() + && self.in_flight_monitor_updates.is_empty() + } + + // Returns a count of all channels we have with this peer, including unfunded channels. + fn total_channel_count(&self) -> usize { + self.channel_by_id.len() + + self.outbound_v1_channel_by_id.len() + + self.inbound_v1_channel_by_id.len() + } + + // Returns a bool indicating if the given `channel_id` matches a channel we have with this peer. + fn has_channel(&self, channel_id: &[u8; 32]) -> bool { + self.channel_by_id.contains_key(channel_id) || + self.outbound_v1_channel_by_id.contains_key(channel_id) || + self.inbound_v1_channel_by_id.contains_key(channel_id) } } @@ -715,7 +759,23 @@ pub type SimpleArcChannelManager = ChannelManager< /// of [`KeysManager`] and [`DefaultRouter`]. /// /// This is not exported to bindings users as Arcs don't make sense in bindings -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = + ChannelManager< + &'a M, + &'b T, + &'c KeysManager, + &'c KeysManager, + &'c KeysManager, + &'d F, + &'e DefaultRouter< + &'f NetworkGraph<&'g L>, + &'g L, + &'h Mutex, &'g L>>, + ProbabilisticScoringFeeParameters, + ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L> + >, + &'g L + >; macro_rules! define_test_pub_trait { ($vis: vis) => { /// A trivial trait which describes any [`ChannelManager`] used in testing. @@ -1061,7 +1121,6 @@ where /// Notifier the lock contains sends out a notification when the lock is released. total_consistency_lock: RwLock<()>, - #[cfg(debug_assertions)] background_events_processed_since_startup: AtomicBool, persistence_notifier: Notifier, @@ -1427,6 +1486,9 @@ pub struct ChannelDetails { /// /// [`confirmations_required`]: ChannelDetails::confirmations_required pub is_channel_ready: bool, + /// The stage of the channel's shutdown. + /// `None` for `ChannelDetails` serialized on LDK versions prior to 0.0.116. + pub channel_shutdown_state: Option, /// True if the channel is (a) confirmed and channel_ready messages have been exchanged, (b) /// the peer is connected, and (c) the channel is not currently negotiating a shutdown. /// @@ -1466,10 +1528,13 @@ impl ChannelDetails { self.short_channel_id.or(self.outbound_scid_alias) } - fn from_channel_context(context: &ChannelContext, - best_block_height: u32, latest_features: InitFeatures) -> Self { - - let balance = context.get_available_balances(); + fn from_channel_context( + context: &ChannelContext, best_block_height: u32, latest_features: InitFeatures, + fee_estimator: &LowerBoundedFeeEstimator + ) -> Self + where F::Target: FeeEstimator + { + let balance = context.get_available_balances(fee_estimator); let (to_remote_reserve_satoshis, to_self_reserve_satoshis) = context.get_holder_counterparty_selected_channel_reserve_satoshis(); ChannelDetails { @@ -1514,10 +1579,33 @@ impl ChannelDetails { inbound_htlc_minimum_msat: Some(context.get_holder_htlc_minimum_msat()), inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(), config: Some(context.config()), + channel_shutdown_state: Some(context.shutdown_state()), } } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Further information on the details of the channel shutdown. +/// Upon channels being forced closed (i.e. commitment transaction confirmation detected +/// by `ChainMonitor`), ChannelShutdownState will be set to `ShutdownComplete` or +/// the channel will be removed shortly. +/// Also note, that in normal operation, peers could disconnect at any of these states +/// and require peer re-connection before making progress onto other states +pub enum ChannelShutdownState { + /// Channel has not sent or received a shutdown message. + NotShuttingDown, + /// Local node has sent a shutdown message for this channel. + ShutdownInitiated, + /// Shutdown message exchanges have concluded and the channels are in the midst of + /// resolving all existing open HTLCs before closing can continue. + ResolvingHTLCs, + /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates. + NegotiatingClosingFee, + /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about + /// to drop the channel. + ShutdownComplete, +} + /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments. /// These include payments that have yet to find a successful path, or have unresolved HTLCs. #[derive(Debug, PartialEq)] @@ -1610,14 +1698,23 @@ macro_rules! handle_error { Err(err) }, } - } } + } }; + ($self: ident, $internal: expr) => { + match $internal { + Ok(res) => Ok(res), + Err((chan, msg_handle_err)) => { + let counterparty_node_id = chan.get_counterparty_node_id(); + handle_error!($self, Err(msg_handle_err), counterparty_node_id).map_err(|err| (chan, err)) + }, + } + }; } macro_rules! update_maps_on_chan_removal { - ($self: expr, $channel: expr) => {{ - $self.id_to_peer.lock().unwrap().remove(&$channel.context.channel_id()); + ($self: expr, $channel_context: expr) => {{ + $self.id_to_peer.lock().unwrap().remove(&$channel_context.channel_id()); let mut short_to_chan_info = $self.short_to_chan_info.write().unwrap(); - if let Some(short_id) = $channel.context.get_short_channel_id() { + if let Some(short_id) = $channel_context.get_short_channel_id() { short_to_chan_info.remove(&short_id); } else { // If the channel was never confirmed on-chain prior to its closure, remove the @@ -1626,10 +1723,10 @@ macro_rules! update_maps_on_chan_removal { // also don't want a counterparty to be able to trivially cause a memory leak by simply // opening a million channels with us which are closed before we ever reach the funding // stage. - let alias_removed = $self.outbound_scid_aliases.lock().unwrap().remove(&$channel.context.outbound_scid_alias()); + let alias_removed = $self.outbound_scid_aliases.lock().unwrap().remove(&$channel_context.outbound_scid_alias()); debug_assert!(alias_removed); } - short_to_chan_info.remove(&$channel.context.outbound_scid_alias()); + short_to_chan_info.remove(&$channel_context.outbound_scid_alias()); }} } @@ -1645,12 +1742,25 @@ macro_rules! convert_chan_err { }, ChannelError::Close(msg) => { log_error!($self.logger, "Closing channel {} due to close-required error: {}", log_bytes!($channel_id[..]), msg); - update_maps_on_chan_removal!($self, $channel); - let shutdown_res = $channel.force_shutdown(true); + update_maps_on_chan_removal!($self, &$channel.context); + let shutdown_res = $channel.context.force_shutdown(true); (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel.context.get_user_id(), shutdown_res, $self.get_channel_update_for_broadcast(&$channel).ok())) }, } + }; + ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, UNFUNDED) => { + match $err { + // We should only ever have `ChannelError::Close` when unfunded channels error. + // In any case, just close the channel. + ChannelError::Warn(msg) | ChannelError::Ignore(msg) | ChannelError::Close(msg) => { + log_error!($self.logger, "Closing unfunded channel {} due to an error: {}", log_bytes!($channel_id[..]), msg); + update_maps_on_chan_removal!($self, &$channel_context); + let shutdown_res = $channel_context.force_shutdown(false); + (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel_context.get_user_id(), + shutdown_res, None)) + }, + } } } @@ -1669,6 +1779,21 @@ macro_rules! break_chan_entry { } } +macro_rules! try_v1_outbound_chan_entry { + ($self: ident, $res: expr, $entry: expr) => { + match $res { + Ok(res) => res, + Err(e) => { + let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), UNFUNDED); + if drop { + $entry.remove_entry(); + } + return Err(res); + } + } + } +} + macro_rules! try_chan_entry { ($self: ident, $res: expr, $entry: expr) => { match $res { @@ -1688,7 +1813,7 @@ macro_rules! remove_channel { ($self: expr, $entry: expr) => { { let channel = $entry.remove_entry().1; - update_maps_on_chan_removal!($self, channel); + update_maps_on_chan_removal!($self, &channel.context); channel } } @@ -1745,7 +1870,7 @@ macro_rules! emit_channel_ready_event { } macro_rules! handle_monitor_update_completion { - ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { + ($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { let mut updates = $chan.monitor_updating_restored(&$self.logger, &$self.node_signer, $self.genesis_hash, &$self.default_configuration, $self.best_block.read().unwrap().height()); @@ -1794,41 +1919,65 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); - #[cfg(debug_assertions)] { - debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); - } + debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); match $update_res { ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", log_bytes!($chan.context.channel_id()[..])); - Ok(()) + Ok(false) }, ChannelMonitorUpdateStatus::PermanentFailure => { log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan.context.channel_id()[..])); - update_maps_on_chan_removal!($self, $chan); - let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown( + update_maps_on_chan_removal!($self, &$chan.context); + let res = Err(MsgHandleErrInternal::from_finish_shutdown( "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(), - $chan.context.get_user_id(), $chan.force_shutdown(false), + $chan.context.get_user_id(), $chan.context.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok())); $remove; res }, ChannelMonitorUpdateStatus::Completed => { - $chan.complete_one_mon_update($update_id); - if $chan.no_monitor_updates_pending() { - handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); - } - Ok(()) + $completed; + Ok(true) }, } } }; - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { - handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) + }; + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry()) + }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) + .or_insert_with(Vec::new); + // During startup, we push monitor updates as background events through to here in + // order to replay updates that were in-flight when we shut down. Thus, we have to + // filter for uniqueness here. + let idx = in_flight_updates.iter().position(|upd| upd == &$update) + .unwrap_or_else(|| { + in_flight_updates.push($update); + in_flight_updates.len() - 1 + }); + let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]); + handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + { + let _ = in_flight_updates.remove(idx); + if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); + } + }) + } }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) } } @@ -1878,6 +2027,8 @@ macro_rules! process_events_body { let mut pending_events = $self.pending_events.lock().unwrap(); pending_events.drain(..num_events); processed_all_events = pending_events.is_empty(); + // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being + // updated here with the `pending_events` lock acquired. $self.pending_events_processor.store(false, Ordering::Release); } @@ -1907,6 +2058,8 @@ where { /// Constructs a new `ChannelManager` to hold several channels and route between them. /// + /// The current time or latest block header time can be provided as the `current_timestamp`. + /// /// This is the main "logic hub" for all channel-related actions, and implements /// [`ChannelMessageHandler`]. /// @@ -1920,7 +2073,11 @@ where /// [`block_connected`]: chain::Listen::block_connected /// [`block_disconnected`]: chain::Listen::block_disconnected /// [`params.best_block.block_hash`]: chain::BestBlock::block_hash - pub fn new(fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters) -> Self { + pub fn new( + fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, + node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters, + current_timestamp: u32, + ) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); let inbound_pmt_key_material = node_signer.get_inbound_payment_key_material(); @@ -1952,7 +2109,7 @@ where probing_cookie_secret: entropy_source.get_secure_random_bytes(), - highest_seen_timestamp: AtomicUsize::new(0), + highest_seen_timestamp: AtomicUsize::new(current_timestamp as usize), per_peer_state: FairRwLock::new(HashMap::new()), @@ -1960,7 +2117,6 @@ where pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), - #[cfg(debug_assertions)] background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), @@ -2044,7 +2200,7 @@ where let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); let their_features = &peer_state.latest_features; let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration }; - match Channel::new_outbound(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key, + match OutboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key, their_features, channel_value_satoshis, push_msat, user_channel_id, config, self.best_block.read().unwrap().height(), outbound_scid_alias) { @@ -2058,7 +2214,7 @@ where let res = channel.get_open_channel(self.genesis_hash.clone()); let temporary_channel_id = channel.context.channel_id(); - match peer_state.channel_by_id.entry(temporary_channel_id) { + match peer_state.outbound_v1_channel_by_id.entry(temporary_channel_id) { hash_map::Entry::Occupied(_) => { if cfg!(fuzzing) { return Err(APIError::APIMisuseError { err: "Fuzzy bad RNG".to_owned() }); @@ -2076,7 +2232,7 @@ where Ok(temporary_channel_id) } - fn list_channels_with_filter::Signer>)) -> bool + Copy>(&self, f: Fn) -> Vec { + fn list_funded_channels_with_filter::Signer>)) -> bool + Copy>(&self, f: Fn) -> Vec { // Allocate our best estimate of the number of channels we have in the `res` // Vec. Sadly the `short_to_chan_info` map doesn't cover channels without // a scid or a scid alias, and the `id_to_peer` shouldn't be used outside @@ -2090,9 +2246,10 @@ where for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + // Only `Channels` in the channel_by_id map can be considered funded. for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } } @@ -2103,7 +2260,37 @@ where /// Gets the list of open channels, in random order. See [`ChannelDetails`] field documentation for /// more information. pub fn list_channels(&self) -> Vec { - self.list_channels_with_filter(|_| true) + // Allocate our best estimate of the number of channels we have in the `res` + // Vec. Sadly the `short_to_chan_info` map doesn't cover channels without + // a scid or a scid alias, and the `id_to_peer` shouldn't be used outside + // of the ChannelMonitor handling. Therefore reallocations may still occur, but is + // unlikely as the `short_to_chan_info` map often contains 2 entries for + // the same channel. + let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len()); + { + let best_block_height = self.best_block.read().unwrap().height(); + let per_peer_state = self.per_peer_state.read().unwrap(); + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (_channel_id, channel) in peer_state.channel_by_id.iter() { + let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, + peer_state.latest_features.clone(), &self.fee_estimator); + res.push(details); + } + for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() { + let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, + peer_state.latest_features.clone(), &self.fee_estimator); + res.push(details); + } + for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() { + let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, + peer_state.latest_features.clone(), &self.fee_estimator); + res.push(details); + } + } + } + res } /// Gets the list of usable channels, in random order. Useful as an argument to @@ -2116,7 +2303,7 @@ where // Note we use is_live here instead of usable which leads to somewhat confused // internal/external nomenclature, but that's ok cause that's probably what the user // really wanted anyway. - self.list_channels_with_filter(|&(_, ref channel)| channel.context.is_live()) + self.list_funded_channels_with_filter(|&(_, ref channel)| channel.context.is_live()) } /// Gets the list of channels we have with a given counterparty, in random order. @@ -2128,10 +2315,15 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let features = &peer_state.latest_features; + let chan_context_to_details = |context| { + ChannelDetails::from_channel_context(context, best_block_height, features.clone(), &self.fee_estimator) + }; return peer_state.channel_by_id .iter() - .map(|(_, channel)| - ChannelDetails::from_channel_context(&channel.context, best_block_height, features.clone())) + .map(|(_, channel)| &channel.context) + .chain(peer_state.outbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context)) + .chain(peer_state.inbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context)) + .map(chan_context_to_details) .collect(); } vec![] @@ -2166,19 +2358,19 @@ where } /// Helper function that issues the channel close events - fn issue_channel_close_events(&self, channel: &Channel<::Signer>, closure_reason: ClosureReason) { + fn issue_channel_close_events(&self, context: &ChannelContext<::Signer>, closure_reason: ClosureReason) { let mut pending_events_lock = self.pending_events.lock().unwrap(); - match channel.unbroadcasted_funding() { + match context.unbroadcasted_funding() { Some(transaction) => { pending_events_lock.push_back((events::Event::DiscardFunding { - channel_id: channel.context.channel_id(), transaction + channel_id: context.channel_id(), transaction }, None)); }, None => {}, } pending_events_lock.push_back((events::Event::ChannelClosed { - channel_id: channel.context.channel_id(), - user_channel_id: channel.context.get_user_id(), + channel_id: context.channel_id(), + user_channel_id: context.get_user_id(), reason: closure_reason }, None)); } @@ -2188,49 +2380,58 @@ where let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>; let result: Result<(), _> = loop { - let per_peer_state = self.per_peer_state.read().unwrap(); + { + let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex = per_peer_state.get(counterparty_node_id) - .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(channel_id.clone()) { - hash_map::Entry::Occupied(mut chan_entry) => { - let funding_txo_opt = chan_entry.get().context.get_funding_txo(); - let their_features = &peer_state.latest_features; - let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut() - .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; - failed_htlcs = htlcs; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; - // We can send the `shutdown` message before updating the `ChannelMonitor` - // here as we don't need the monitor update to complete until we send a - // `shutdown_signed`, which we'll delay if we're pending a monitor update. - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: *counterparty_node_id, - msg: shutdown_msg, - }); + match peer_state.channel_by_id.entry(channel_id.clone()) { + hash_map::Entry::Occupied(mut chan_entry) => { + let funding_txo_opt = chan_entry.get().context.get_funding_txo(); + let their_features = &peer_state.latest_features; + let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut() + .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; + failed_htlcs = htlcs; - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update_opt.take() { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); - } + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: *counterparty_node_id, + msg: shutdown_msg, + }); - if chan_entry.get().is_shutdown() { - let channel = remove_channel!(self, chan_entry); - if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: channel_update - }); + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt.take() { + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); } - self.issue_channel_close_events(&channel, ClosureReason::HolderForceClosed); - } - break Ok(()); - }, - hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), counterparty_node_id) }) + + if chan_entry.get().is_shutdown() { + let channel = remove_channel!(self, chan_entry); + if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: channel_update + }); + } + self.issue_channel_close_events(&channel.context, ClosureReason::HolderForceClosed); + } + break Ok(()); + }, + hash_map::Entry::Vacant(_) => (), + } } + // If we reach this point, it means that the channel_id either refers to an unfunded channel or + // it does not exist for this peer. Either way, we can attempt to force-close it. + // + // An appropriate error will be returned for non-existence of the channel if that's the case. + return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ()) + // TODO(dunxen): This is still not ideal as we're doing some extra lookups. + // Fix this with https://github.com/lightningdevkit/rust-lightning/issues/2422 }; for htlc_source in failed_htlcs.drain(..) { @@ -2330,30 +2531,46 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(peer_node_id) .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?; - let mut chan = { + let (update_opt, counterparty_node_id) = { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let closure_reason = if let Some(peer_msg) = peer_msg { + ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) } + } else { + ClosureReason::HolderForceClosed + }; if let hash_map::Entry::Occupied(chan) = peer_state.channel_by_id.entry(channel_id.clone()) { - if let Some(peer_msg) = peer_msg { - self.issue_channel_close_events(chan.get(),ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) }); - } else { - self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed); - } - remove_channel!(self, chan) + log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); + self.issue_channel_close_events(&chan.get().context, closure_reason); + let mut chan = remove_channel!(self, chan); + self.finish_force_close_channel(chan.context.force_shutdown(broadcast)); + (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id()) + } else if let hash_map::Entry::Occupied(chan) = peer_state.outbound_v1_channel_by_id.entry(channel_id.clone()) { + log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); + self.issue_channel_close_events(&chan.get().context, closure_reason); + let mut chan = remove_channel!(self, chan); + self.finish_force_close_channel(chan.context.force_shutdown(false)); + // Unfunded channel has no update + (None, chan.context.get_counterparty_node_id()) + } else if let hash_map::Entry::Occupied(chan) = peer_state.inbound_v1_channel_by_id.entry(channel_id.clone()) { + log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); + self.issue_channel_close_events(&chan.get().context, closure_reason); + let mut chan = remove_channel!(self, chan); + self.finish_force_close_channel(chan.context.force_shutdown(false)); + // Unfunded channel has no update + (None, chan.context.get_counterparty_node_id()) } else { return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), peer_node_id) }); } }; - log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); - self.finish_force_close_channel(chan.force_shutdown(broadcast)); - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + if let Some(update) = update_opt { let mut peer_state = peer_state_mutex.lock().unwrap(); peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } - Ok(chan.context.get_counterparty_node_id()) + Ok(counterparty_node_id) } fn force_close_sending_error(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, broadcast: bool) -> Result<(), APIError> { @@ -2414,9 +2631,11 @@ where } } - fn construct_recv_pending_htlc_info(&self, hop_data: msgs::OnionHopData, shared_secret: [u8; 32], - payment_hash: PaymentHash, amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>) -> Result - { + fn construct_recv_pending_htlc_info( + &self, hop_data: msgs::OnionHopData, shared_secret: [u8; 32], payment_hash: PaymentHash, + amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>, allow_underpay: bool, + counterparty_skimmed_fee_msat: Option, + ) -> Result { // final_incorrect_cltv_expiry if hop_data.outgoing_cltv_value > cltv_expiry { return Err(ReceiveError { @@ -2442,7 +2661,10 @@ where msg: "The final CLTV expiry is too soon to handle", }); } - if hop_data.amt_to_forward > amt_msat { + if (!allow_underpay && hop_data.amt_to_forward > amt_msat) || + (allow_underpay && hop_data.amt_to_forward > + amt_msat.saturating_add(counterparty_skimmed_fee_msat.unwrap_or(0))) + { return Err(ReceiveError { err_code: 19, err_data: amt_msat.to_be_bytes().to_vec(), @@ -2509,15 +2731,18 @@ where incoming_amt_msat: Some(amt_msat), outgoing_amt_msat: hop_data.amt_to_forward, outgoing_cltv_value: hop_data.outgoing_cltv_value, + skimmed_fee_msat: counterparty_skimmed_fee_msat, }) } - fn decode_update_add_htlc_onion(&self, msg: &msgs::UpdateAddHTLC) -> PendingHTLCStatus { + fn decode_update_add_htlc_onion( + &self, msg: &msgs::UpdateAddHTLC + ) -> Result<(onion_utils::Hop, [u8; 32], Option>), HTLCFailureMsg> { macro_rules! return_malformed_err { ($msg: expr, $err_code: expr) => { { log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg); - return PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC { + return Err(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC { channel_id: msg.channel_id, htlc_id: msg.htlc_id, sha256_of_onion: Sha256::hash(&msg.onion_routing_packet.hop_data).into_inner(), @@ -2548,7 +2773,7 @@ where ($msg: expr, $err_code: expr, $data: expr) => { { log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg); - return PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { + return Err(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { channel_id: msg.channel_id, htlc_id: msg.htlc_id, reason: HTLCFailReason::reason($err_code, $data.to_vec()) @@ -2567,11 +2792,186 @@ where return_err!(err_msg, err_code, &[0; 0]); }, }; + let (outgoing_scid, outgoing_amt_msat, outgoing_cltv_value, next_packet_pk_opt) = match next_hop { + onion_utils::Hop::Forward { + next_hop_data: msgs::OnionHopData { + format: msgs::OnionHopDataFormat::NonFinalNode { short_channel_id }, amt_to_forward, + outgoing_cltv_value, + }, .. + } => { + let next_pk = onion_utils::next_hop_packet_pubkey(&self.secp_ctx, + msg.onion_routing_packet.public_key.unwrap(), &shared_secret); + (short_channel_id, amt_to_forward, outgoing_cltv_value, Some(next_pk)) + }, + // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the + // inbound channel's state. + onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)), + onion_utils::Hop::Forward { + next_hop_data: msgs::OnionHopData { format: msgs::OnionHopDataFormat::FinalNode { .. }, .. }, .. + } => { + return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]); + } + }; + + // Perform outbound checks here instead of in [`Self::construct_pending_htlc_info`] because we + // can't hold the outbound peer state lock at the same time as the inbound peer state lock. + if let Some((err, mut code, chan_update)) = loop { + let id_option = self.short_to_chan_info.read().unwrap().get(&outgoing_scid).cloned(); + let forwarding_chan_info_opt = match id_option { + None => { // unknown_next_peer + // Note that this is likely a timing oracle for detecting whether an scid is a + // phantom or an intercept. + if (self.default_configuration.accept_intercept_htlcs && + fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, outgoing_scid, &self.genesis_hash)) || + fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, outgoing_scid, &self.genesis_hash) + { + None + } else { + break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + } + }, + Some((cp_id, id)) => Some((cp_id.clone(), id.clone())), + }; + let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { + break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let chan = match peer_state.channel_by_id.get_mut(&forwarding_id) { + None => { + // Channel was removed. The short_to_chan_info and channel_by_id maps + // have no consistency guarantees. + break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + }, + Some(chan) => chan + }; + if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels { + // Note that the behavior here should be identical to the above block - we + // should NOT reveal the existence or non-existence of a private channel if + // we don't allow forwards outbound over them. + break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None)); + } + if chan.context.get_channel_type().supports_scid_privacy() && outgoing_scid != chan.context.outbound_scid_alias() { + // `option_scid_alias` (referred to in LDK as `scid_privacy`) means + // "refuse to forward unless the SCID alias was used", so we pretend + // we don't have the channel here. + break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None)); + } + let chan_update_opt = self.get_channel_update_for_onion(outgoing_scid, chan).ok(); + + // Note that we could technically not return an error yet here and just hope + // that the connection is reestablished or monitor updated by the time we get + // around to doing the actual forward, but better to fail early if we can and + // hopefully an attacker trying to path-trace payments cannot make this occur + // on a small/per-node/per-channel scale. + if !chan.context.is_live() { // channel_disabled + // If the channel_update we're going to return is disabled (i.e. the + // peer has been disabled for some time), return `channel_disabled`, + // otherwise return `temporary_channel_failure`. + if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { + break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); + } else { + break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); + } + } + if outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum + break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); + } + if let Err((err, code)) = chan.htlc_satisfies_config(&msg, outgoing_amt_msat, outgoing_cltv_value) { + break Some((err, code, chan_update_opt)); + } + chan_update_opt + } else { + if (msg.cltv_expiry as u64) < (outgoing_cltv_value) as u64 + MIN_CLTV_EXPIRY_DELTA as u64 { + // We really should set `incorrect_cltv_expiry` here but as we're not + // forwarding over a real channel we can't generate a channel_update + // for it. Instead we just return a generic temporary_node_failure. + break Some(( + "Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta", + 0x2000 | 2, None, + )); + } + None + }; + + let cur_height = self.best_block.read().unwrap().height() + 1; + // Theoretically, channel counterparty shouldn't send us a HTLC expiring now, + // but we want to be robust wrt to counterparty packet sanitization (see + // HTLC_FAIL_BACK_BUFFER rationale). + if msg.cltv_expiry <= cur_height + HTLC_FAIL_BACK_BUFFER as u32 { // expiry_too_soon + break Some(("CLTV expiry is too close", 0x1000 | 14, chan_update_opt)); + } + if msg.cltv_expiry > cur_height + CLTV_FAR_FAR_AWAY as u32 { // expiry_too_far + break Some(("CLTV expiry is too far in the future", 21, None)); + } + // If the HTLC expires ~now, don't bother trying to forward it to our + // counterparty. They should fail it anyway, but we don't want to bother with + // the round-trips or risk them deciding they definitely want the HTLC and + // force-closing to ensure they get it if we're offline. + // We previously had a much more aggressive check here which tried to ensure + // our counterparty receives an HTLC which has *our* risk threshold met on it, + // but there is no need to do that, and since we're a bit conservative with our + // risk threshold it just results in failing to forward payments. + if (outgoing_cltv_value) as u64 <= (cur_height + LATENCY_GRACE_PERIOD_BLOCKS) as u64 { + break Some(("Outgoing CLTV value is too soon", 0x1000 | 14, chan_update_opt)); + } + + break None; + } + { + let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2)); + if let Some(chan_update) = chan_update { + if code == 0x1000 | 11 || code == 0x1000 | 12 { + msg.amount_msat.write(&mut res).expect("Writes cannot fail"); + } + else if code == 0x1000 | 13 { + msg.cltv_expiry.write(&mut res).expect("Writes cannot fail"); + } + else if code == 0x1000 | 20 { + // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791 + 0u16.write(&mut res).expect("Writes cannot fail"); + } + (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail"); + msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail"); + chan_update.write(&mut res).expect("Writes cannot fail"); + } else if code & 0x1000 == 0x1000 { + // If we're trying to return an error that requires a `channel_update` but + // we're forwarding to a phantom or intercept "channel" (i.e. cannot + // generate an update), just use the generic "temporary_node_failure" + // instead. + code = 0x2000 | 2; + } + return_err!(err, code, &res.0[..]); + } + Ok((next_hop, shared_secret, next_packet_pk_opt)) + } - let pending_forward_info = match next_hop { + fn construct_pending_htlc_status<'a>( + &self, msg: &msgs::UpdateAddHTLC, shared_secret: [u8; 32], decoded_hop: onion_utils::Hop, + allow_underpay: bool, next_packet_pubkey_opt: Option> + ) -> PendingHTLCStatus { + macro_rules! return_err { + ($msg: expr, $err_code: expr, $data: expr) => { + { + log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg); + return PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + reason: HTLCFailReason::reason($err_code, $data.to_vec()) + .get_encrypted_failure_packet(&shared_secret, &None), + })); + } + } + } + match decoded_hop { onion_utils::Hop::Receive(next_hop_data) => { // OUR PAYMENT! - match self.construct_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash, msg.amount_msat, msg.cltv_expiry, None) { + match self.construct_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash, + msg.amount_msat, msg.cltv_expiry, None, allow_underpay, msg.skimmed_fee_msat) + { Ok(info) => { // Note that we could obviously respond immediately with an update_fulfill_htlc // message, however that would leak that we are the recipient of this payment, so @@ -2583,10 +2983,10 @@ where } }, onion_utils::Hop::Forward { next_hop_data, next_hop_hmac, new_packet_bytes } => { - let new_pubkey = msg.onion_routing_packet.public_key.unwrap(); + debug_assert!(next_packet_pubkey_opt.is_some()); let outgoing_packet = msgs::OnionPacket { version: 0, - public_key: onion_utils::next_hop_packet_pubkey(&self.secp_ctx, new_pubkey, &shared_secret), + public_key: next_packet_pubkey_opt.unwrap_or(Err(secp256k1::Error::InvalidPublicKey)), hop_data: new_packet_bytes, hmac: next_hop_hmac.clone(), }; @@ -2608,150 +3008,10 @@ where incoming_amt_msat: Some(msg.amount_msat), outgoing_amt_msat: next_hop_data.amt_to_forward, outgoing_cltv_value: next_hop_data.outgoing_cltv_value, + skimmed_fee_msat: None, }) } - }; - - if let &PendingHTLCStatus::Forward(PendingHTLCInfo { ref routing, ref outgoing_amt_msat, ref outgoing_cltv_value, .. }) = &pending_forward_info { - // If short_channel_id is 0 here, we'll reject the HTLC as there cannot be a channel - // with a short_channel_id of 0. This is important as various things later assume - // short_channel_id is non-0 in any ::Forward. - if let &PendingHTLCRouting::Forward { ref short_channel_id, .. } = routing { - if let Some((err, mut code, chan_update)) = loop { - let id_option = self.short_to_chan_info.read().unwrap().get(short_channel_id).cloned(); - let forwarding_chan_info_opt = match id_option { - None => { // unknown_next_peer - // Note that this is likely a timing oracle for detecting whether an scid is a - // phantom or an intercept. - if (self.default_configuration.accept_intercept_htlcs && - fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, *short_channel_id, &self.genesis_hash)) || - fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, *short_channel_id, &self.genesis_hash) - { - None - } else { - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - } - }, - Some((cp_id, id)) => Some((cp_id.clone(), id.clone())), - }; - let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { - let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if peer_state_mutex_opt.is_none() { - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let chan = match peer_state.channel_by_id.get_mut(&forwarding_id) { - None => { - // Channel was removed. The short_to_chan_info and channel_by_id maps - // have no consistency guarantees. - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - }, - Some(chan) => chan - }; - if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels { - // Note that the behavior here should be identical to the above block - we - // should NOT reveal the existence or non-existence of a private channel if - // we don't allow forwards outbound over them. - break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None)); - } - if chan.context.get_channel_type().supports_scid_privacy() && *short_channel_id != chan.context.outbound_scid_alias() { - // `option_scid_alias` (referred to in LDK as `scid_privacy`) means - // "refuse to forward unless the SCID alias was used", so we pretend - // we don't have the channel here. - break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None)); - } - let chan_update_opt = self.get_channel_update_for_onion(*short_channel_id, chan).ok(); - - // Note that we could technically not return an error yet here and just hope - // that the connection is reestablished or monitor updated by the time we get - // around to doing the actual forward, but better to fail early if we can and - // hopefully an attacker trying to path-trace payments cannot make this occur - // on a small/per-node/per-channel scale. - if !chan.context.is_live() { // channel_disabled - // If the channel_update we're going to return is disabled (i.e. the - // peer has been disabled for some time), return `channel_disabled`, - // otherwise return `temporary_channel_failure`. - if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { - break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); - } else { - break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); - } - } - if *outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum - break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); - } - if let Err((err, code)) = chan.htlc_satisfies_config(&msg, *outgoing_amt_msat, *outgoing_cltv_value) { - break Some((err, code, chan_update_opt)); - } - chan_update_opt - } else { - if (msg.cltv_expiry as u64) < (*outgoing_cltv_value) as u64 + MIN_CLTV_EXPIRY_DELTA as u64 { - // We really should set `incorrect_cltv_expiry` here but as we're not - // forwarding over a real channel we can't generate a channel_update - // for it. Instead we just return a generic temporary_node_failure. - break Some(( - "Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta", - 0x2000 | 2, None, - )); - } - None - }; - - let cur_height = self.best_block.read().unwrap().height() + 1; - // Theoretically, channel counterparty shouldn't send us a HTLC expiring now, - // but we want to be robust wrt to counterparty packet sanitization (see - // HTLC_FAIL_BACK_BUFFER rationale). - if msg.cltv_expiry <= cur_height + HTLC_FAIL_BACK_BUFFER as u32 { // expiry_too_soon - break Some(("CLTV expiry is too close", 0x1000 | 14, chan_update_opt)); - } - if msg.cltv_expiry > cur_height + CLTV_FAR_FAR_AWAY as u32 { // expiry_too_far - break Some(("CLTV expiry is too far in the future", 21, None)); - } - // If the HTLC expires ~now, don't bother trying to forward it to our - // counterparty. They should fail it anyway, but we don't want to bother with - // the round-trips or risk them deciding they definitely want the HTLC and - // force-closing to ensure they get it if we're offline. - // We previously had a much more aggressive check here which tried to ensure - // our counterparty receives an HTLC which has *our* risk threshold met on it, - // but there is no need to do that, and since we're a bit conservative with our - // risk threshold it just results in failing to forward payments. - if (*outgoing_cltv_value) as u64 <= (cur_height + LATENCY_GRACE_PERIOD_BLOCKS) as u64 { - break Some(("Outgoing CLTV value is too soon", 0x1000 | 14, chan_update_opt)); - } - - break None; - } - { - let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2)); - if let Some(chan_update) = chan_update { - if code == 0x1000 | 11 || code == 0x1000 | 12 { - msg.amount_msat.write(&mut res).expect("Writes cannot fail"); - } - else if code == 0x1000 | 13 { - msg.cltv_expiry.write(&mut res).expect("Writes cannot fail"); - } - else if code == 0x1000 | 20 { - // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791 - 0u16.write(&mut res).expect("Writes cannot fail"); - } - (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail"); - msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail"); - chan_update.write(&mut res).expect("Writes cannot fail"); - } else if code & 0x1000 == 0x1000 { - // If we're trying to return an error that requires a `channel_update` but - // we're forwarding to a phantom or intercept "channel" (i.e. cannot - // generate an update), just use the generic "temporary_node_failure" - // instead. - code = 0x2000 | 2; - } - return_err!(err, code, &res.0[..]); - } - } } - - pending_forward_info } /// Gets the current [`channel_update`] for the given channel. This first checks if the channel is @@ -2798,6 +3058,7 @@ where self.get_channel_update_for_onion(short_channel_id, chan) } + fn get_channel_update_for_onion(&self, short_channel_id: u64, chan: &Channel<::Signer>) -> Result { log_trace!(self.logger, "Generating channel update for channel {}", log_bytes!(chan.context.channel_id())); let were_node_one = self.our_network_pubkey.serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..]; @@ -2836,10 +3097,17 @@ where #[cfg(test)] pub(crate) fn test_send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { let _lck = self.total_consistency_lock.read().unwrap(); - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv_bytes) + self.send_payment_along_path(SendAlongPathArgs { + path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, + session_priv_bytes + }) } - fn send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { + fn send_payment_along_path(&self, args: SendAlongPathArgs) -> Result<(), APIError> { + let SendAlongPathArgs { + path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, + session_priv_bytes + } = args; // The top-level caller should hold the total_consistency_lock read lock. debug_assert!(self.total_consistency_lock.try_write().is_err()); @@ -2876,22 +3144,21 @@ where session_priv: session_priv.clone(), first_hop_htlc_msat: htlc_msat, payment_id, - }, onion_packet, &self.logger); + }, onion_packet, None, &self.fee_estimator, &self.logger); match break_chan_entry!(self, send_res, chan) { Some(monitor_update) => { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update); - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) { - break Err(e); - } - if update_res == ChannelMonitorUpdateStatus::InProgress { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); + match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) { + Err(e) => break Err(e), + Ok(false) => { + // Note that MonitorUpdateInProgress here indicates (per function + // docs) that we will resend the commitment update once monitor + // updating completes. Therefore, we must return an error + // indicating that it is unsafe to retry the payment wholesale, + // which we do in the send_payment check for + // MonitorUpdateInProgress, below. + return Err(APIError::MonitorUpdateInProgress); + }, + Ok(true) => {}, } }, None => { }, @@ -2960,6 +3227,7 @@ where /// irrevocably committed to on our end. In such a case, do NOT retry the payment with a /// different route unless you intend to pay twice! /// + /// [`RouteHop`]: crate::routing::router::RouteHop /// [`Event::PaymentSent`]: events::Event::PaymentSent /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`UpdateHTLCs`]: events::MessageSendEvent::UpdateHTLCs @@ -2969,9 +3237,9 @@ where let best_block_height = self.best_block.read().unwrap().height(); let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments - .send_payment_with_route(route, payment_hash, recipient_onion, payment_id, &self.entropy_source, &self.node_signer, best_block_height, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + .send_payment_with_route(route, payment_hash, recipient_onion, payment_id, + &self.entropy_source, &self.node_signer, best_block_height, + |args| self.send_payment_along_path(args)) } /// Similar to [`ChannelManager::send_payment_with_route`], but will automatically find a route based on @@ -2983,18 +3251,16 @@ where .send_payment(payment_hash, recipient_onion, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.logger, - &self.pending_events, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + &self.pending_events, |args| self.send_payment_along_path(args)) } #[cfg(test)] pub(super) fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, keysend_preimage: Option, payment_id: PaymentId, recv_value_msat: Option, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, best_block_height, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion, + keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, + best_block_height, |args| self.send_payment_along_path(args)) } #[cfg(test)] @@ -3048,9 +3314,7 @@ where let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_spontaneous_payment_with_route( route, payment_preimage, recipient_onion, payment_id, &self.entropy_source, - &self.node_signer, best_block_height, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + &self.node_signer, best_block_height, |args| self.send_payment_along_path(args)) } /// Similar to [`ChannelManager::send_spontaneous_payment`], but will automatically find a route @@ -3066,9 +3330,7 @@ where self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, recipient_onion, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, - &self.logger, &self.pending_events, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + &self.logger, &self.pending_events, |args| self.send_payment_along_path(args)) } /// Send a payment that is probing the given route for liquidity. We calculate the @@ -3077,9 +3339,9 @@ where pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, + &self.entropy_source, &self.node_signer, best_block_height, + |args| self.send_payment_along_path(args)) } /// Returns whether a payment with the given [`PaymentHash`] and [`PaymentId`] is, in fact, a @@ -3091,7 +3353,7 @@ where /// Handles the generation of a funding transaction, optionally (for tests) with a function /// which checks the correctness of the funding transaction given the associated channel. - fn funding_transaction_generated_intern::Signer>, &Transaction) -> Result>( + fn funding_transaction_generated_intern::Signer>, &Transaction) -> Result>( &self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput ) -> Result<(), APIError> { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -3100,21 +3362,24 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - let (msg, chan) = match peer_state.channel_by_id.remove(temporary_channel_id) { - Some(mut chan) => { + let (chan, msg) = match peer_state.outbound_v1_channel_by_id.remove(temporary_channel_id) { + Some(chan) => { let funding_txo = find_funding_output(&chan, &funding_transaction)?; - let funding_res = chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger) - .map_err(|e| if let ChannelError::Close(msg) = e { - MsgHandleErrInternal::from_finish_shutdown(msg, chan.context.channel_id(), chan.context.get_user_id(), chan.force_shutdown(true), None) + let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger) + .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e { + let channel_id = chan.context.channel_id(); + let user_id = chan.context.get_user_id(); + let shutdown_res = chan.context.force_shutdown(false); + (chan, MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, user_id, shutdown_res, None)) } else { unreachable!(); }); match funding_res { - Ok(funding_msg) => (funding_msg, chan), - Err(_) => { + Ok((chan, funding_msg)) => (chan, funding_msg), + Err((chan, err)) => { mem::drop(peer_state_lock); mem::drop(per_peer_state); - let _ = handle_error!(self, funding_res, chan.context.get_counterparty_node_id()); + let _: Result<(), _> = handle_error!(self, Err(err), chan.context.get_counterparty_node_id()); return Err(APIError::ChannelUnavailable { err: "Signer refused to sign the initial commitment transaction".to_owned() }); @@ -3274,27 +3539,48 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; for channel_id in channel_ids { - if !peer_state.channel_by_id.contains_key(channel_id) { + if !peer_state.has_channel(channel_id) { return Err(APIError::ChannelUnavailable { err: format!("Channel with ID {} was not found for the passed counterparty_node_id {}", log_bytes!(*channel_id), counterparty_node_id), }); - } + }; } for channel_id in channel_ids { - let channel = peer_state.channel_by_id.get_mut(channel_id).unwrap(); - let mut config = channel.context.config(); - config.apply(config_update); - if !channel.context.update_config(&config) { + if let Some(channel) = peer_state.channel_by_id.get_mut(channel_id) { + let mut config = channel.context.config(); + config.apply(config_update); + if !channel.context.update_config(&config) { + continue; + } + if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); + } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { + node_id: channel.context.get_counterparty_node_id(), + msg, + }); + } continue; } - if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); - } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { - node_id: channel.context.get_counterparty_node_id(), - msg, + + let context = if let Some(channel) = peer_state.inbound_v1_channel_by_id.get_mut(channel_id) { + &mut channel.context + } else if let Some(channel) = peer_state.outbound_v1_channel_by_id.get_mut(channel_id) { + &mut channel.context + } else { + // This should not be reachable as we've already checked for non-existence in the previous channel_id loop. + debug_assert!(false); + return Err(APIError::ChannelUnavailable { + err: format!( + "Channel with ID {} for passed counterparty_node_id {} disappeared after we confirmed its existence - this should not be reachable!", + log_bytes!(*channel_id), counterparty_node_id), }); - } + }; + let mut config = context.config(); + config.apply(config_update); + // We update the config, but we MUST NOT broadcast a `channel_update` before `channel_ready` + // which would be the case for pending inbound/outbound channels. + context.update_config(&config); } Ok(()) } @@ -3340,13 +3626,16 @@ where /// [`ChannelManager::fail_intercepted_htlc`] MUST be called in response to the event. /// /// Note that LDK does not enforce fee requirements in `amt_to_forward_msat`, and will not stop - /// you from forwarding more than you received. + /// you from forwarding more than you received. See + /// [`HTLCIntercepted::expected_outbound_amount_msat`] for more on forwarding a different amount + /// than expected. /// /// Errors if the event was not handled in time, in which case the HTLC was automatically failed /// backwards. /// /// [`UserConfig::accept_intercept_htlcs`]: crate::util::config::UserConfig::accept_intercept_htlcs /// [`HTLCIntercepted`]: events::Event::HTLCIntercepted + /// [`HTLCIntercepted::expected_outbound_amount_msat`]: events::Event::HTLCIntercepted::expected_outbound_amount_msat // TODO: when we move to deciding the best outbound channel at forward time, only take // `next_node_id` and not `next_hop_channel_id` pub fn forward_intercepted_htlc(&self, intercept_id: InterceptId, next_hop_channel_id: &[u8; 32], next_node_id: PublicKey, amt_to_forward_msat: u64) -> Result<(), APIError> { @@ -3368,7 +3657,8 @@ where chan.context.get_short_channel_id().unwrap_or(chan.context.outbound_scid_alias()) }, None => return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id) + err: format!("Funded channel with id {} not found for the passed counterparty node_id {}. Channel may still be opening.", + log_bytes!(*next_hop_channel_id), next_node_id) }) } }; @@ -3384,7 +3674,10 @@ where }, _ => unreachable!() // Only `PendingHTLCRouting::Forward`s are intercepted }; + let skimmed_fee_msat = + payment.forward_info.outgoing_amt_msat.saturating_sub(amt_to_forward_msat); let pending_htlc_info = PendingHTLCInfo { + skimmed_fee_msat: if skimmed_fee_msat == 0 { None } else { Some(skimmed_fee_msat) }, outgoing_amt_msat: amt_to_forward_msat, routing, ..payment.forward_info }; @@ -3454,7 +3747,7 @@ where prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id, forward_info: PendingHTLCInfo { routing, incoming_shared_secret, payment_hash, outgoing_amt_msat, - outgoing_cltv_value, incoming_amt_msat: _ + outgoing_cltv_value, .. } }) => { macro_rules! failure_handler { @@ -3516,7 +3809,10 @@ where }; match next_hop { onion_utils::Hop::Receive(hop_data) => { - match self.construct_recv_pending_htlc_info(hop_data, incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, Some(phantom_shared_secret)) { + match self.construct_recv_pending_htlc_info(hop_data, + incoming_shared_secret, payment_hash, outgoing_amt_msat, + outgoing_cltv_value, Some(phantom_shared_secret), false, None) + { Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, vec![(info, prev_htlc_id)])), Err(ReceiveError { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret)) } @@ -3567,7 +3863,7 @@ where prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id: _, forward_info: PendingHTLCInfo { incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, - routing: PendingHTLCRouting::Forward { onion_packet, .. }, incoming_amt_msat: _, + routing: PendingHTLCRouting::Forward { onion_packet, .. }, skimmed_fee_msat, .. }, }) => { log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id); @@ -3581,7 +3877,8 @@ where }); if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), - onion_packet, &self.logger) + onion_packet, skimmed_fee_msat, &self.fee_estimator, + &self.logger) { if let ChannelError::Ignore(msg) = e { log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg); @@ -3625,7 +3922,8 @@ where HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id, forward_info: PendingHTLCInfo { - routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, .. + routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, + skimmed_fee_msat, .. } }) => { let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret, mut onion_fields) = match routing { @@ -3666,6 +3964,7 @@ where total_msat: if let Some(data) = &payment_data { data.total_msat } else { outgoing_amt_msat }, cltv_expiry, onion_payload, + counterparty_skimmed_fee_msat: skimmed_fee_msat, }; let mut committed_to_claimable = false; @@ -3762,11 +4061,16 @@ where htlcs.push(claimable_htlc); let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum(); htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat)); + let counterparty_skimmed_fee_msat = htlcs.iter() + .map(|htlc| htlc.counterparty_skimmed_fee_msat.unwrap_or(0)).sum(); + debug_assert!(total_value.saturating_sub(amount_msat) <= + counterparty_skimmed_fee_msat); new_events.push_back((events::Event::PaymentClaimable { receiver_node_id: Some(receiver_node_id), payment_hash, purpose: $purpose, amount_msat, + counterparty_skimmed_fee_msat, via_channel_id: Some(prev_channel_id), via_user_channel_id: Some(prev_user_channel_id), claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER), @@ -3863,9 +4167,7 @@ where let best_block_height = self.best_block.read().unwrap().height(); self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, - &self.pending_events, &self.logger, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)); + &self.pending_events, &self.logger, |args| self.send_payment_along_path(args)); for (htlc_source, payment_hash, failure_reason, destination) in failed_forwards.drain(..) { self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination); @@ -3889,7 +4191,6 @@ where fn process_background_events(&self) -> NotifyOption { debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread); - #[cfg(debug_assertions)] self.background_events_processed_since_startup.store(true, Ordering::Release); let mut background_events = Vec::new(); @@ -3900,14 +4201,13 @@ where for event in background_events.drain(..) { match event { - BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { + BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { // The channel has already been closed, so no use bothering to care about the // monitor updating completing. let _ = self.chain_monitor.update_channel(funding_txo, &update); }, BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => { - let update_res = self.chain_monitor.update_channel(funding_txo, &update); - + let mut updated_chan = false; let res = { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { @@ -3915,12 +4215,18 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { hash_map::Entry::Occupied(mut chan) => { - handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan) + updated_chan = true; + handle_new_monitor_update!(self, funding_txo, update.clone(), + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) }, hash_map::Entry::Vacant(_) => Ok(()), } } else { Ok(()) } }; + if !updated_chan { + // TODO: Track this as in-flight even though the channel is closed. + let _ = self.chain_monitor.update_channel(funding_txo, &update); + } // TODO: If this channel has since closed, we're likely providing a payment // preimage update, which we must ensure is durable! We currently don't, // however, ensure that. @@ -3930,6 +4236,22 @@ where } let _ = handle_error!(self, res, counterparty_node_id); }, + BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if let Some(chan) = peer_state.channel_by_id.get_mut(&channel_id) { + handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, chan); + } else { + let update_actions = peer_state.monitor_update_blocked_actions + .remove(&channel_id).unwrap_or(Vec::new()); + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(update_actions); + } + } + }, } } NotifyOption::DoPersist @@ -3958,7 +4280,7 @@ where log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.", log_bytes!(chan_id[..]), chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - chan.queue_update_fee(new_feerate, &self.logger); + chan.queue_update_fee(new_feerate, &self.fee_estimator, &self.logger); NotifyOption::DoPersist } @@ -3971,13 +4293,19 @@ where PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { let mut should_persist = self.process_background_events(); - let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); + let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); + let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; for (chan_id, chan) in peer_state.channel_by_id.iter_mut() { + let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() { + min_mempool_feerate + } else { + normal_feerate + }; let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } } @@ -3997,6 +4325,7 @@ where /// * Expiring a channel's previous [`ChannelConfig`] if necessary to only allow forwarding HTLCs /// with the current [`ChannelConfig`]. /// * Removing peers which have disconnected but and no longer have any channels. + /// * Force-closing and removing channels which have not completed establishment in a timely manner. /// /// Note that this may cause reentrancy through [`chain::Watch::update_channel`] calls or feerate /// estimate fetches. @@ -4007,7 +4336,8 @@ where PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { let mut should_persist = self.process_background_events(); - let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); + let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); + let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new(); let mut timed_out_mpp_htlcs = Vec::new(); @@ -4020,6 +4350,11 @@ where let pending_msg_events = &mut peer_state.pending_msg_events; let counterparty_node_id = *counterparty_node_id; peer_state.channel_by_id.retain(|chan_id, chan| { + let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() { + min_mempool_feerate + } else { + normal_feerate + }; let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } @@ -4085,6 +4420,26 @@ where true }); + + let process_unfunded_channel_tick = | + chan_id: &[u8; 32], + chan_context: &mut ChannelContext<::Signer>, + unfunded_chan_context: &mut UnfundedChannelContext, + | { + chan_context.maybe_expire_prev_config(); + if unfunded_chan_context.should_expire_unfunded_channel() { + log_error!(self.logger, "Force-closing pending outbound channel {} for not establishing in a timely manner", log_bytes!(&chan_id[..])); + update_maps_on_chan_removal!(self, &chan_context); + self.issue_channel_close_events(&chan_context, ClosureReason::HolderForceClosed); + self.finish_force_close_channel(chan_context.force_shutdown(false)); + false + } else { + true + } + }; + peer_state.outbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context)); + peer_state.inbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context)); + if peer_state.ok_to_remove(true) { pending_peers_awaiting_removal.push(counterparty_node_id); } @@ -4475,6 +4830,11 @@ where -> Result<(), (PublicKey, MsgHandleErrInternal)> { //TODO: Delay the claimed_funds relaying just like we do outbound relay! + // If we haven't yet run background events assume we're still deserializing and shouldn't + // actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as + // `BackgroundEvent`s. + let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire); + { let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.outpoint.to_channel_id(); @@ -4501,16 +4861,26 @@ where log_bytes!(chan_id), action); peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update); - let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan); - if let Err(e) = res { - // TODO: This is a *critical* error - we probably updated the outbound edge - // of the HTLC's monitor with a preimage. We should retry this monitor - // update over and over again until morale improves. - log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); - return Err((counterparty_node_id, e)); + if !during_init { + let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, + peer_state, per_peer_state, chan); + if let Err(e) = res { + // TODO: This is a *critical* error - we probably updated the outbound edge + // of the HTLC's monitor with a preimage. We should retry this monitor + // update over and over again until morale improves. + log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); + return Err((counterparty_node_id, e)); + } + } else { + // If we're running during init we cannot update a monitor directly - + // they probably haven't actually been loaded yet. Instead, push the + // monitor update as a background event. + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, + funding_txo: prev_hop.outpoint, + update: monitor_update.clone(), + }); } } return Ok(()); @@ -4523,16 +4893,34 @@ where payment_preimage, }], }; - // We update the ChannelMonitor on the backward link, after - // receiving an `update_fulfill_htlc` from the forward link. - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); - if update_res != ChannelMonitorUpdateStatus::Completed { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same event and try - // again on restart. - log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, update_res); + + if !during_init { + // We update the ChannelMonitor on the backward link, after + // receiving an `update_fulfill_htlc` from the forward link. + let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); + if update_res != ChannelMonitorUpdateStatus::Completed { + // TODO: This needs to be handled somehow - if we receive a monitor update + // with a preimage we *must* somehow manage to propagate it to the upstream + // channel, or we must have an ability to receive the same event and try + // again on restart. + log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", + payment_preimage, update_res); + } + } else { + // If we're running during init we cannot update a monitor directly - they probably + // haven't actually been loaded yet. Instead, push the monitor update as a background + // event. + // Note that while it's safe to use `ClosedMonitorUpdateRegeneratedOnStartup` here (the + // channel is already closed) we need to ultimately handle the monitor update + // completion action only after we've completed the monitor update. This is the only + // way to guarantee this update *will* be regenerated on startup (otherwise if this was + // from a forwarded HTLC the downstream preimage may be deleted before we claim + // upstream). Thus, we need to transition to some new `BackgroundEvent` type which will + // complete the monitor update completion action from `completion_action`. + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(( + prev_hop.outpoint, preimage_update, + ))); } // Note that we do process the completion action here. This totally could be a // duplicate claim, but we have no way of knowing without interrogating the @@ -4550,6 +4938,8 @@ where fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_id: [u8; 32]) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { + debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire), + "We don't support claim_htlc claims during startup - monitors may not be available yet"); self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger); }, HTLCSource::PreviousHopData(hop_data) => { @@ -4705,18 +5095,29 @@ where if peer_state_mutex_opt.is_none() { return } peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - let mut channel = { - match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ - hash_map::Entry::Occupied(chan) => chan, - hash_map::Entry::Vacant(_) => return, - } - }; - log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}", - highest_applied_update_id, channel.get().context.get_latest_monitor_update_id()); - if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id { + let channel = + if let Some(chan) = peer_state.channel_by_id.get_mut(&funding_txo.to_channel_id()) { + chan + } else { + let update_actions = peer_state.monitor_update_blocked_actions + .remove(&funding_txo.to_channel_id()).unwrap_or(Vec::new()); + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(update_actions); + return; + }; + let remaining_in_flight = + if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) { + pending.retain(|upd| upd.update_id > highest_applied_update_id); + pending.len() + } else { 0 }; + log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.", + highest_applied_update_id, channel.context.get_latest_monitor_update_id(), + remaining_in_flight); + if !channel.is_awaiting_monitor_update() || channel.context.get_latest_monitor_update_id() != highest_applied_update_id { return; } - handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, per_peer_state, channel.get_mut()); + handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel); } /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`]. @@ -4764,16 +5165,17 @@ where fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u128) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let peers_without_funded_channels = self.peers_without_funded_channels(|peer| !peer.channel_by_id.is_empty()); + let peers_without_funded_channels = + self.peers_without_funded_channels(|peer| { peer.total_channel_count() > 0 }); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - let is_only_peer_channel = peer_state.channel_by_id.len() == 1; - match peer_state.channel_by_id.entry(temporary_channel_id.clone()) { + let is_only_peer_channel = peer_state.total_channel_count() == 1; + match peer_state.inbound_v1_channel_by_id.entry(temporary_channel_id.clone()) { hash_map::Entry::Occupied(mut channel) => { - if !channel.get().inbound_is_awaiting_accept() { + if !channel.get().is_awaiting_accept() { return Err(APIError::APIMisuseError { err: "The channel isn't currently awaiting to be accepted.".to_owned() }); } if accept_0conf { @@ -4832,7 +5234,7 @@ where let peer = peer_mtx.lock().unwrap(); if !maybe_count_peer(&*peer) { continue; } let num_unfunded_channels = Self::unfunded_channel_count(&peer, best_block_height); - if num_unfunded_channels == peer.channel_by_id.len() { + if num_unfunded_channels == peer.total_channel_count() { peers_without_funded_channels += 1; } } @@ -4845,12 +5247,19 @@ where ) -> usize { let mut num_unfunded_channels = 0; for (_, chan) in peer.channel_by_id.iter() { + // This covers non-zero-conf inbound `Channel`s that we are currently monitoring, but those + // which have not yet had any confirmations on-chain. if !chan.context.is_outbound() && chan.context.minimum_depth().unwrap_or(1) != 0 && chan.context.get_funding_tx_confirmations(best_block_height) == 0 { num_unfunded_channels += 1; } } + for (_, chan) in peer.inbound_v1_channel_by_id.iter() { + if chan.context.minimum_depth().unwrap_or(1) != 0 { + num_unfunded_channels += 1; + } + } num_unfunded_channels } @@ -4871,7 +5280,8 @@ where // Get the number of peers with channels, but without funded ones. We don't care too much // about peers that never open a channel, so we filter by peers that have at least one // channel, and then limit the number of those with unfunded channels. - let channeled_peers_without_funding = self.peers_without_funded_channels(|node| !node.channel_by_id.is_empty()); + let channeled_peers_without_funding = + self.peers_without_funded_channels(|node| node.total_channel_count() > 0); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -4885,7 +5295,7 @@ where // If this peer already has some channels, a new channel won't increase our number of peers // with unfunded channels, so as long as we aren't over the maximum number of unfunded // channels per-peer we can accept channels from a peer with existing ones. - if peer_state.channel_by_id.is_empty() && + if peer_state.total_channel_count() == 0 && channeled_peers_without_funding >= MAX_UNFUNDED_CHANNEL_PEERS && !self.default_configuration.manually_accept_inbound_channels { @@ -4901,7 +5311,7 @@ where msg.temporary_channel_id.clone())); } - let mut channel = match Channel::new_from_req(&self.fee_estimator, &self.entropy_source, &self.signer_provider, + let mut channel = match InboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration, best_block_height, &self.logger, outbound_scid_alias) { @@ -4911,33 +5321,35 @@ where }, Ok(res) => res }; - match peer_state.channel_by_id.entry(channel.context.channel_id()) { - hash_map::Entry::Occupied(_) => { - self.outbound_scid_aliases.lock().unwrap().remove(&outbound_scid_alias); - return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision for the same peer!".to_owned(), msg.temporary_channel_id.clone())) - }, - hash_map::Entry::Vacant(entry) => { - if !self.default_configuration.manually_accept_inbound_channels { - if channel.context.get_channel_type().requires_zero_conf() { - return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone())); - } - peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { - node_id: counterparty_node_id.clone(), - msg: channel.accept_inbound_channel(user_channel_id), - }); - } else { - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push_back((events::Event::OpenChannelRequest { - temporary_channel_id: msg.temporary_channel_id.clone(), - counterparty_node_id: counterparty_node_id.clone(), - funding_satoshis: msg.funding_satoshis, - push_msat: msg.push_msat, - channel_type: channel.context.get_channel_type().clone(), - }, None)); + let channel_id = channel.context.channel_id(); + let channel_exists = peer_state.has_channel(&channel_id); + if channel_exists { + self.outbound_scid_aliases.lock().unwrap().remove(&outbound_scid_alias); + return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision for the same peer!".to_owned(), msg.temporary_channel_id.clone())) + } else { + if !self.default_configuration.manually_accept_inbound_channels { + let channel_type = channel.context.get_channel_type(); + if channel_type.requires_zero_conf() { + return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone())); } - - entry.insert(channel); + if channel_type.requires_anchors_zero_fee_htlc_tx() { + return Err(MsgHandleErrInternal::send_err_msg_no_close("No channels with anchor outputs accepted".to_owned(), msg.temporary_channel_id.clone())); + } + peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { + node_id: counterparty_node_id.clone(), + msg: channel.accept_inbound_channel(user_channel_id), + }); + } else { + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push_back((events::Event::OpenChannelRequest { + temporary_channel_id: msg.temporary_channel_id.clone(), + counterparty_node_id: counterparty_node_id.clone(), + funding_satoshis: msg.funding_satoshis, + push_msat: msg.push_msat, + channel_type: channel.context.get_channel_type().clone(), + }, None)); } + peer_state.inbound_v1_channel_by_id.insert(channel_id, channel); } Ok(()) } @@ -4952,9 +5364,9 @@ where })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(msg.temporary_channel_id) { + match peer_state.outbound_v1_channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { - try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &peer_state.latest_features), chan); + try_v1_outbound_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &peer_state.latest_features), chan); (chan.get().context.get_value_satoshis(), chan.get().context.get_funding_redeemscript().to_v0_p2wsh(), chan.get().context.get_user_id()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) @@ -4983,12 +5395,24 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - let ((funding_msg, monitor), chan) = - match peer_state.channel_by_id.entry(msg.temporary_channel_id) { - hash_map::Entry::Occupied(mut chan) => { - (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.signer_provider, &self.logger), chan), chan.remove()) + let (chan, funding_msg, monitor) = + match peer_state.inbound_v1_channel_by_id.remove(&msg.temporary_channel_id) { + Some(inbound_chan) => { + match inbound_chan.funding_created(msg, best_block, &self.signer_provider, &self.logger) { + Ok(res) => res, + Err((mut inbound_chan, err)) => { + // We've already removed this inbound channel from the map in `PeerState` + // above so at this point we just need to clean up any lingering entries + // concerning this channel as it is safe to do so. + update_maps_on_chan_removal!(self, &inbound_chan.context); + let user_id = inbound_chan.context.get_user_id(); + let shutdown_res = inbound_chan.context.force_shutdown(false); + return Err(MsgHandleErrInternal::from_finish_shutdown(format!("{}", err), + msg.temporary_channel_id, user_id, shutdown_res, None)); + }, + } }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) + None => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) }; match peer_state.channel_by_id.entry(funding_msg.channel_id) { @@ -5020,8 +5444,9 @@ where let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); let chan = e.insert(chan); - let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, - per_peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) }); + let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state, + per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, + { peer_state.channel_by_id.remove(&new_channel_id) }); // Note that we reply with the new channel_id in error messages if we gave up on the // channel, not the temporary_channel_id. This is compatible with ourselves, but the @@ -5033,7 +5458,7 @@ where if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { res.0 = None; } - res + res.map(|_| ()) } } } @@ -5054,7 +5479,7 @@ where let monitor = try_chan_entry!(self, chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); let update_res = self.chain_monitor.watch_channel(chan.get().context.get_funding_txo().unwrap(), monitor); - let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan); + let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { // We weren't able to watch the channel to begin with, so no updates should be made on // it. Previously, full_stack_target found an (unreachable) panic when the @@ -5063,7 +5488,7 @@ where shutdown_finish.0.take(); } } - res + res.map(|_| ()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -5125,39 +5550,50 @@ where })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(msg.channel_id.clone()) { - hash_map::Entry::Occupied(mut chan_entry) => { - - if !chan_entry.get().received_shutdown() { - log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.", - log_bytes!(msg.channel_id), - if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" }); - } + // TODO(dunxen): Fix this duplication when we switch to a single map with enums as per + // https://github.com/lightningdevkit/rust-lightning/issues/2422 + if let hash_map::Entry::Occupied(chan_entry) = peer_state.outbound_v1_channel_by_id.entry(msg.channel_id.clone()) { + log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", log_bytes!(&msg.channel_id[..])); + self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); + let mut chan = remove_channel!(self, chan_entry); + self.finish_force_close_channel(chan.context.force_shutdown(false)); + return Ok(()); + } else if let hash_map::Entry::Occupied(chan_entry) = peer_state.inbound_v1_channel_by_id.entry(msg.channel_id.clone()) { + log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", log_bytes!(&msg.channel_id[..])); + self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); + let mut chan = remove_channel!(self, chan_entry); + self.finish_force_close_channel(chan.context.force_shutdown(false)); + return Ok(()); + } else if let hash_map::Entry::Occupied(mut chan_entry) = peer_state.channel_by_id.entry(msg.channel_id.clone()) { + if !chan_entry.get().received_shutdown() { + log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.", + log_bytes!(msg.channel_id), + if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" }); + } - let funding_txo_opt = chan_entry.get().context.get_funding_txo(); - let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self, - chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry); - dropped_htlcs = htlcs; + let funding_txo_opt = chan_entry.get().context.get_funding_txo(); + let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self, + chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry); + dropped_htlcs = htlcs; - if let Some(msg) = shutdown { - // We can send the `shutdown` message before updating the `ChannelMonitor` - // here as we don't need the monitor update to complete until we send a - // `shutdown_signed`, which we'll delay if we're pending a monitor update. - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: *counterparty_node_id, - msg, - }); - } + if let Some(msg) = shutdown { + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: *counterparty_node_id, + msg, + }); + } - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update_opt { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); - } - break Ok(()); - }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt { + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); + } + break Ok(()); + } else { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; for htlc_source in dropped_htlcs.drain(..) { @@ -5212,7 +5648,7 @@ where msg: update }); } - self.issue_channel_close_events(&chan, ClosureReason::CooperativeClosure); + self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure); } Ok(()) } @@ -5227,7 +5663,7 @@ where //encrypted with the same key. It's not immediately obvious how to usefully exploit that, //but we should prevent it anyway. - let pending_forward_info = self.decode_update_add_htlc_onion(msg); + let decoded_hop_res = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5239,6 +5675,12 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { + let pending_forward_info = match decoded_hop_res { + Ok((next_hop, shared_secret, next_packet_pk_opt)) => + self.construct_pending_htlc_status(msg, shared_secret, next_hop, + chan.get().context.config().accept_underpaying_htlcs, next_packet_pk_opt), + Err(e) => PendingHTLCStatus::Fail(e) + }; let create_pending_htlc_status = |chan: &Channel<::Signer>, pending_forward_info: PendingHTLCStatus, error_code: u16| { // If the update_add is completely bogus, the call will Err and we will close, // but if we've sent a shutdown and they haven't acknowledged it yet, we just @@ -5261,7 +5703,7 @@ where _ => pending_forward_info } }; - try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan); + try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5343,10 +5785,8 @@ where let funding_txo = chan.get().context.get_funding_txo(); let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan) + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, + peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5436,22 +5876,27 @@ where } } - // We only want to push a PendingHTLCsForwardable event if no others are queued. fn push_pending_forwards_ev(&self) { let mut pending_events = self.pending_events.lock().unwrap(); - let forward_ev_exists = pending_events.iter() - .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) - .is_some(); - if !forward_ev_exists { - pending_events.push_back((events::Event::PendingHTLCsForwardable { - time_forwardable: - Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), + let is_processing_events = self.pending_events_processor.load(Ordering::Acquire); + let num_forward_events = pending_events.iter().filter(|(ev, _)| + if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false } + ).count(); + // We only want to push a PendingHTLCsForwardable event if no others are queued. Processing + // events is done in batches and they are not removed until we're done processing each + // batch. Since handling a `PendingHTLCsForwardable` event will call back into the + // `ChannelManager`, we'll still see the original forwarding event not removed. Phantom + // payments will need an additional forwarding event before being claimed to make them look + // real by taking more time. + if (is_processing_events && num_forward_events <= 1) || num_forward_events < 1 { + pending_events.push_back((Event::PendingHTLCsForwardable { + time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), }, None)); } } /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote - /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event + /// [`msgs::RevokeAndACK`] should be held for the given channel until some other action /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of /// the [`ChannelMonitorUpdate`] in question. fn raa_monitor_updates_held(&self, @@ -5480,12 +5925,10 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().context.get_funding_txo(); - let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan); let res = if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, - peer_state_lock, peer_state, per_peer_state, chan) + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) }; (htlcs_to_fail, res) }, @@ -5686,7 +6129,7 @@ where let pending_msg_events = &mut peer_state.pending_msg_events; if let hash_map::Entry::Occupied(chan_entry) = peer_state.channel_by_id.entry(funding_outpoint.to_channel_id()) { let mut chan = remove_channel!(self, chan_entry); - failed_channels.push(chan.force_shutdown(false)); + failed_channels.push(chan.context.force_shutdown(false)); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update @@ -5697,7 +6140,7 @@ where } else { ClosureReason::CommitmentTxConfirmed }; - self.issue_channel_close_events(&chan, reason); + self.issue_channel_close_events(&chan.context, reason); pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: chan.context.get_counterparty_node_id(), action: msgs::ErrorAction::SendErrorMessage { @@ -5753,18 +6196,15 @@ where let counterparty_node_id = chan.context.get_counterparty_node_id(); let funding_txo = chan.context.get_funding_txo(); let (monitor_opt, holding_cell_failed_htlcs) = - chan.maybe_free_holding_cell_htlcs(&self.logger); + chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &self.logger); if !holding_cell_failed_htlcs.is_empty() { failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id)); } if let Some(monitor_update) = monitor_opt { has_monitor_update = true; - let update_res = self.chain_monitor.update_channel( - funding_txo.expect("channel is live"), monitor_update); - let update_id = monitor_update.update_id; let channel_id: [u8; 32] = *channel_id; - let res = handle_new_monitor_update!(self, update_res, update_id, + let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING, peer_state.channel_by_id.remove(&channel_id)); if res.is_err() { @@ -5822,11 +6262,11 @@ where }); } - self.issue_channel_close_events(chan, ClosureReason::CooperativeClosure); + self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure); log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); self.tx_broadcaster.broadcast_transactions(&[&tx]); - update_maps_on_chan_removal!(self, chan); + update_maps_on_chan_removal!(self, &chan.context); false } else { true } }, @@ -5874,37 +6314,6 @@ where } } - fn set_payment_hash_secret_map(&self, payment_hash: PaymentHash, payment_preimage: Option, min_value_msat: Option, invoice_expiry_delta_secs: u32) -> Result { - assert!(invoice_expiry_delta_secs <= 60*60*24*365); // Sadly bitcoin timestamps are u32s, so panic before 2106 - - if min_value_msat.is_some() && min_value_msat.unwrap() > MAX_VALUE_MSAT { - return Err(APIError::APIMisuseError { err: format!("min_value_msat of {} greater than total 21 million bitcoin supply", min_value_msat.unwrap()) }); - } - - let payment_secret = PaymentSecret(self.entropy_source.get_secure_random_bytes()); - - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let mut payment_secrets = self.pending_inbound_payments.lock().unwrap(); - match payment_secrets.entry(payment_hash) { - hash_map::Entry::Vacant(e) => { - e.insert(PendingInboundPayment { - payment_secret, min_value_msat, payment_preimage, - user_payment_id: 0, // For compatibility with version 0.0.103 and earlier - // We assume that highest_seen_timestamp is pretty close to the current time - - // it's updated when we receive a new block with the maximum time we've seen in - // a header. It should never be more than two hours in the future. - // Thus, we add two hours here as a buffer to ensure we absolutely - // never fail a payment too early. - // Note that we assume that received blocks have reasonably up-to-date - // timestamps. - expiry_time: self.highest_seen_timestamp.load(Ordering::Acquire) as u64 + invoice_expiry_delta_secs as u64 + 7200, - }); - }, - hash_map::Entry::Occupied(_) => return Err(APIError::APIMisuseError { err: "Duplicate payment hash".to_owned() }), - } - Ok(payment_secret) - } - /// Gets a payment secret and payment hash for use in an invoice given to a third party wishing /// to pay us. /// @@ -5944,23 +6353,6 @@ where min_final_cltv_expiry_delta) } - /// Legacy version of [`create_inbound_payment`]. Use this method if you wish to share - /// serialized state with LDK node(s) running 0.0.103 and earlier. - /// - /// May panic if `invoice_expiry_delta_secs` is greater than one year. - /// - /// # Note - /// This method is deprecated and will be removed soon. - /// - /// [`create_inbound_payment`]: Self::create_inbound_payment - #[deprecated] - pub fn create_inbound_payment_legacy(&self, min_value_msat: Option, invoice_expiry_delta_secs: u32) -> Result<(PaymentHash, PaymentSecret), APIError> { - let payment_preimage = PaymentPreimage(self.entropy_source.get_secure_random_bytes()); - let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let payment_secret = self.set_payment_hash_secret_map(payment_hash, Some(payment_preimage), min_value_msat, invoice_expiry_delta_secs)?; - Ok((payment_hash, payment_secret)) - } - /// Gets a [`PaymentSecret`] for a given [`PaymentHash`], for which the payment preimage is /// stored external to LDK. /// @@ -6014,20 +6406,6 @@ where min_final_cltv_expiry) } - /// Legacy version of [`create_inbound_payment_for_hash`]. Use this method if you wish to share - /// serialized state with LDK node(s) running 0.0.103 and earlier. - /// - /// May panic if `invoice_expiry_delta_secs` is greater than one year. - /// - /// # Note - /// This method is deprecated and will be removed soon. - /// - /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash - #[deprecated] - pub fn create_inbound_payment_for_hash_legacy(&self, payment_hash: PaymentHash, min_value_msat: Option, invoice_expiry_delta_secs: u32) -> Result { - self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs) - } - /// Gets an LDK-generated payment preimage from a payment hash and payment secret that were /// previously returned from [`create_inbound_payment`]. /// @@ -6102,7 +6480,7 @@ where inflight_htlcs } - #[cfg(any(test, fuzzing, feature = "_test_utils"))] + #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { let events = core::cell::RefCell::new(Vec::new()); let event_handler = |event: events::Event| events.borrow_mut().push(event); @@ -6135,7 +6513,7 @@ where /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an /// [`Event`] being handled) completes, this should be called to restore the channel to normal /// operation. It will double-check that nothing *else* is also blocking the same channel from - /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly. + /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly. fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option) { let mut errors = Vec::new(); loop { @@ -6168,9 +6546,7 @@ where if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); - let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update); - let update_id = monitor_update.update_id; - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, + if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, peer_state_lck, peer_state, per_peer_state, chan) { errors.push((e, counterparty_node_id)); @@ -6530,17 +6906,17 @@ where } } } else if let Err(reason) = res { - update_maps_on_chan_removal!(self, channel); + update_maps_on_chan_removal!(self, &channel.context); // It looks like our counterparty went on-chain or funding transaction was // reorged out of the main chain. Close the channel. - failed_channels.push(channel.force_shutdown(true)); + failed_channels.push(channel.context.force_shutdown(true)); if let Ok(update) = self.get_channel_update_for_broadcast(&channel) { pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } let reason_message = format!("{}", reason); - self.issue_channel_close_events(channel, reason); + self.issue_channel_close_events(&channel.context, reason); pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: channel.context.get_counterparty_node_id(), action: msgs::ErrorAction::SendErrorMessage { msg: msgs::ErrorMessage { @@ -6632,13 +7008,13 @@ where provided_node_features(&self.default_configuration) } - /// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by + /// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by /// [`ChannelManager`]. /// /// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice" /// or not. Thus, this method is not public. #[cfg(any(feature = "_test_utils", test))] - pub fn invoice_features(&self) -> InvoiceFeatures { + pub fn invoice_features(&self) -> Bolt11InvoiceFeatures { provided_invoice_features(&self.default_configuration) } @@ -6790,12 +7166,22 @@ where peer_state.channel_by_id.retain(|_, chan| { chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger); if chan.is_shutdown() { - update_maps_on_chan_removal!(self, chan); - self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer); + update_maps_on_chan_removal!(self, &chan.context); + self.issue_channel_close_events(&chan.context, ClosureReason::DisconnectedPeer); return false; } true }); + peer_state.inbound_v1_channel_by_id.retain(|_, chan| { + update_maps_on_chan_removal!(self, &chan.context); + self.issue_channel_close_events(&chan.context, ClosureReason::DisconnectedPeer); + false + }); + peer_state.outbound_v1_channel_by_id.retain(|_, chan| { + update_maps_on_chan_removal!(self, &chan.context); + self.issue_channel_close_events(&chan.context, ClosureReason::DisconnectedPeer); + false + }); pending_msg_events.retain(|msg| { match msg { // V1 Channel Establishment @@ -6877,8 +7263,11 @@ where } e.insert(Mutex::new(PeerState { channel_by_id: HashMap::new(), + outbound_v1_channel_by_id: HashMap::new(), + inbound_v1_channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: true, @@ -6905,37 +7294,20 @@ where log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); let per_peer_state = self.per_peer_state.read().unwrap(); - for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.retain(|_, chan| { - let retain = if chan.context.get_counterparty_node_id() == *counterparty_node_id { - if !chan.context.have_received_message() { - // If we created this (outbound) channel while we were disconnected from the - // peer we probably failed to send the open_channel message, which is now - // lost. We can't have had anything pending related to this channel, so we just - // drop it. - false - } else { - pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { - node_id: chan.context.get_counterparty_node_id(), - msg: chan.get_channel_reestablish(&self.logger), - }); - true - } - } else { true }; - if retain && chan.context.get_counterparty_node_id() != *counterparty_node_id { - if let Some(msg) = chan.get_signed_channel_announcement(&self.node_signer, self.genesis_hash.clone(), self.best_block.read().unwrap().height(), &self.default_configuration) { - if let Ok(update_msg) = self.get_channel_update_for_broadcast(chan) { - pending_msg_events.push(events::MessageSendEvent::SendChannelAnnouncement { - node_id: *counterparty_node_id, - msg, update_msg, - }); - } - } - } - retain + + // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted + // (so won't be recovered after a crash) we don't need to bother closing unfunded channels and + // clearing their maps here. Instead we can just send queue channel_reestablish messages for + // channels in the channel_by_id map. + peer_state.channel_by_id.iter_mut().for_each(|(_, chan)| { + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.context.get_counterparty_node_id(), + msg: chan.get_channel_reestablish(&self.logger), + }); }); } //TODO: Also re-broadcast announcement_signatures @@ -6952,7 +7324,9 @@ where if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - peer_state.channel_by_id.keys().cloned().collect() + peer_state.channel_by_id.keys().cloned() + .chain(peer_state.outbound_v1_channel_by_id.keys().cloned()) + .chain(peer_state.inbound_v1_channel_by_id.keys().cloned()).collect() }; for channel_id in channel_ids { // Untrusted messages from peer, we throw away the error if id points to a non-existent channel @@ -6966,8 +7340,8 @@ where if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let Some(chan) = peer_state.channel_by_id.get_mut(&msg.channel_id) { - if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash) { + if let Some(chan) = peer_state.outbound_v1_channel_by_id.get_mut(&msg.channel_id) { + if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash, &self.fee_estimator) { peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: *counterparty_node_id, msg, @@ -7055,13 +7429,13 @@ pub(crate) fn provided_node_features(config: &UserConfig) -> NodeFeatures { provided_init_features(config).to_context() } -/// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by +/// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by /// [`ChannelManager`]. /// /// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice" /// or not. Thus, this method is not public. #[cfg(any(feature = "_test_utils", test))] -pub(crate) fn provided_invoice_features(config: &UserConfig) -> InvoiceFeatures { +pub(crate) fn provided_invoice_features(config: &UserConfig) -> Bolt11InvoiceFeatures { provided_init_features(config).to_context() } @@ -7079,7 +7453,7 @@ pub(crate) fn provided_channel_type_features(config: &UserConfig) -> ChannelType /// Fetches the set of [`InitFeatures`] flags which are provided by or required by /// [`ChannelManager`]. -pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { +pub fn provided_init_features(config: &UserConfig) -> InitFeatures { // Note that if new features are added here which other peers may (eventually) require, we // should also add the corresponding (optional) bit to the [`ChannelMessageHandler`] impl for // [`ErroringMessageHandler`]. @@ -7095,11 +7469,8 @@ pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { features.set_channel_type_optional(); features.set_scid_privacy_optional(); features.set_zero_conf_optional(); - #[cfg(anchors)] - { // Attributes are not allowed on if expressions on our current MSRV of 1.41. - if _config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { - features.set_anchors_zero_fee_htlc_tx_optional(); - } + if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { + features.set_anchors_zero_fee_htlc_tx_optional(); } features } @@ -7156,6 +7527,7 @@ impl Writeable for ChannelDetails { (35, self.inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, self.feerate_sat_per_1000_weight, option), + (41, self.channel_shutdown_state, option), }); Ok(()) } @@ -7193,6 +7565,7 @@ impl Readable for ChannelDetails { (35, inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, feerate_sat_per_1000_weight, option), + (41, channel_shutdown_state, option), }); // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with @@ -7228,12 +7601,13 @@ impl Readable for ChannelDetails { inbound_htlc_minimum_msat, inbound_htlc_maximum_msat, feerate_sat_per_1000_weight, + channel_shutdown_state, }) } } impl_writeable_tlv_based!(PhantomRouteHints, { - (2, channels, vec_type), + (2, channels, required_vec), (4, phantom_scid, required), (6, real_node_pubkey, required), }); @@ -7264,6 +7638,7 @@ impl_writeable_tlv_based!(PendingHTLCInfo, { (6, outgoing_amt_msat, required), (8, outgoing_cltv_value, required), (9, incoming_amt_msat, option), + (10, skimmed_fee_msat, option), }); @@ -7362,6 +7737,7 @@ impl Writeable for ClaimableHTLC { (5, self.total_value_received, option), (6, self.cltv_expiry, required), (8, keysend_preimage, option), + (10, self.counterparty_skimmed_fee_msat, option), }); Ok(()) } @@ -7369,24 +7745,19 @@ impl Writeable for ClaimableHTLC { impl Readable for ClaimableHTLC { fn read(reader: &mut R) -> Result { - let mut prev_hop = crate::util::ser::RequiredWrapper(None); - let mut value = 0; - let mut sender_intended_value = None; - let mut payment_data: Option = None; - let mut cltv_expiry = 0; - let mut total_value_received = None; - let mut total_msat = None; - let mut keysend_preimage: Option = None; - read_tlv_fields!(reader, { + _init_and_read_tlv_fields!(reader, { (0, prev_hop, required), (1, total_msat, option), - (2, value, required), + (2, value_ser, required), (3, sender_intended_value, option), - (4, payment_data, option), + (4, payment_data_opt, option), (5, total_value_received, option), (6, cltv_expiry, required), - (8, keysend_preimage, option) + (8, keysend_preimage, option), + (10, counterparty_skimmed_fee_msat, option), }); + let payment_data: Option = payment_data_opt; + let value = value_ser.0.unwrap(); let onion_payload = match keysend_preimage { Some(p) => { if payment_data.is_some() { @@ -7415,7 +7786,8 @@ impl Readable for ClaimableHTLC { total_value_received, total_msat: total_msat.unwrap(), onion_payload, - cltv_expiry, + cltv_expiry: cltv_expiry.0.unwrap(), + counterparty_skimmed_fee_msat, }) } } @@ -7427,7 +7799,7 @@ impl Readable for HTLCSource { 0 => { let mut session_priv: crate::util::ser::RequiredWrapper = crate::util::ser::RequiredWrapper(None); let mut first_hop_htlc_msat: u64 = 0; - let mut path_hops: Option> = Some(Vec::new()); + let mut path_hops = Vec::new(); let mut payment_id = None; let mut payment_params: Option = None; let mut blinded_tail: Option = None; @@ -7435,7 +7807,7 @@ impl Readable for HTLCSource { (0, session_priv, required), (1, payment_id, option), (2, first_hop_htlc_msat, required), - (4, path_hops, vec_type), + (4, path_hops, required_vec), (5, payment_params, (option: ReadableArgs, 0)), (6, blinded_tail, option), }); @@ -7444,7 +7816,7 @@ impl Readable for HTLCSource { // instead. payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref())); } - let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)?, blinded_tail }; + let path = Path { hops: path_hops, blinded_tail }; if path.hops.len() == 0 { return Err(DecodeError::InvalidValue); } @@ -7479,7 +7851,7 @@ impl Writeable for HTLCSource { (1, payment_id_opt, option), (2, first_hop_htlc_msat, required), // 3 was previously used to write a PaymentSecret for the payment. - (4, path.hops, vec_type), + (4, path.hops, required_vec), (5, None::, option), // payment_params in LDK versions prior to 0.0.115 (6, path.blinded_tail, option), }); @@ -7710,6 +8082,16 @@ where pending_claiming_payments = None; } + let mut in_flight_monitor_updates: Option>> = None; + for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) { + for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() { + if !updates.is_empty() { + if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(HashMap::new()); } + in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates); + } + } + } + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -7719,7 +8101,8 @@ where (6, monitor_update_blocked_actions_per_peer, option), (7, self.fake_scid_rand_bytes, required), (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option), - (9, htlc_purposes, vec_type), + (9, htlc_purposes, required_vec), + (10, in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), }); @@ -7769,6 +8152,14 @@ impl Readable for VecDeque<(Event, Option)> { } } +impl_writeable_tlv_based_enum!(ChannelShutdownState, + (0, NotShuttingDown) => {}, + (2, ShutdownInitiated) => {}, + (4, ResolvingHTLCs) => {}, + (6, NegotiatingClosingFee) => {}, + (8, ShutdownComplete) => {}, ; +); + /// Arguments for the creation of a ChannelManager that are not deserialized. /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation @@ -7936,7 +8327,7 @@ where let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut channel_closures = VecDeque::new(); - let mut pending_background_events = Vec::new(); + let mut close_background_events = Vec::new(); for _ in 0..channel_count { let mut channel: Channel<::Signer> = Channel::read(reader, ( &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config) @@ -7944,17 +8335,7 @@ where let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { - if channel.get_latest_complete_monitor_update_id() > monitor.get_latest_update_id() { - // If the channel is ahead of the monitor, return InvalidValue: - log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); - log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", - log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.get_latest_complete_monitor_update_id()); - log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); - log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); - log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); - log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); - return Err(DecodeError::InvalidValue); - } else if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || + if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() || channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || channel.context.get_latest_monitor_update_id() < monitor.get_latest_update_id() { @@ -7963,9 +8344,9 @@ where log_error!(args.logger, " The channel will be force-closed and the latest commitment transaction from the ChannelMonitor broadcast."); log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id()); - let (monitor_update, mut new_failed_htlcs) = channel.force_shutdown(true); + let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true); if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { - pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update }); } @@ -7998,7 +8379,6 @@ where log_info!(args.logger, "Successfully loaded channel {} at update_id {} against monitor at update id {}", log_bytes!(channel.context.channel_id()), channel.context.get_latest_monitor_update_id(), monitor.get_latest_update_id()); - channel.complete_all_mon_updates_through(monitor.get_latest_update_id()); if let Some(short_channel_id) = channel.context.get_short_channel_id() { short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id())); } @@ -8021,7 +8401,7 @@ where // If we were persisted and shut down while the initial ChannelMonitor persistence // was in-progress, we never broadcasted the funding transaction and can still // safely discard the channel. - let _ = channel.force_shutdown(false); + let _ = channel.context.force_shutdown(false); channel_closures.push_back((events::Event::ChannelClosed { channel_id: channel.context.channel_id(), user_channel_id: channel.context.get_user_id(), @@ -8045,7 +8425,7 @@ where update_id: CLOSED_CHANNEL_UPDATE_ID, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }], }; - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); + close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); } } @@ -8074,18 +8454,27 @@ where claimable_htlcs_list.push((payment_hash, previous_hops)); } - let peer_count: u64 = Readable::read(reader)?; - let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); - for _ in 0..peer_count { - let peer_pubkey = Readable::read(reader)?; - let peer_state = PeerState { - channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), - latest_features: Readable::read(reader)?, + let peer_state_from_chans = |channel_by_id| { + PeerState { + channel_by_id, + outbound_v1_channel_by_id: HashMap::new(), + inbound_v1_channel_by_id: HashMap::new(), + latest_features: InitFeatures::empty(), pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: false, - }; + } + }; + + let peer_count: u64 = Readable::read(reader)?; + let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); + for _ in 0..peer_count { + let peer_pubkey = Readable::read(reader)?; + let peer_chans = peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()); + let mut peer_state = peer_state_from_chans(peer_chans); + peer_state.latest_features = Readable::read(reader)?; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -8113,24 +8502,6 @@ where } } - for (node_id, peer_mtx) in per_peer_state.iter() { - let peer_state = peer_mtx.lock().unwrap(); - for (_, chan) in peer_state.channel_by_id.iter() { - for update in chan.uncompleted_unblocked_mon_updates() { - if let Some(funding_txo) = chan.context.get_funding_txo() { - log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}", - update.update_id, log_bytes!(funding_txo.to_channel_id())); - pending_background_events.push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id: *node_id, funding_txo, update: update.clone(), - }); - } else { - return Err(DecodeError::InvalidValue); - } - } - } - } - let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111 let highest_seen_timestamp: u32 = Readable::read(reader)?; @@ -8167,6 +8538,7 @@ where let mut pending_claiming_payments = Some(HashMap::new()); let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; + let mut in_flight_monitor_updates: Option>> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -8176,7 +8548,8 @@ where (6, monitor_update_blocked_actions_per_peer, option), (7, fake_scid_rand_bytes, option), (8, events_override, option), - (9, claimable_htlc_purposes, vec_type), + (9, claimable_htlc_purposes, optional_vec), + (10, in_flight_monitor_updates, option), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), }); @@ -8210,6 +8583,118 @@ where retry_lock: Mutex::new(()) }; + // We have to replay (or skip, if they were completed after we wrote the `ChannelManager`) + // each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to + // check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we + // replayed, and for each monitor update we have to replay we have to ensure there's a + // `ChannelMonitor` for it. + // + // In order to do so we first walk all of our live channels (so that we can check their + // state immediately after doing the update replays, when we have the `update_id`s + // available) and then walk any remaining in-flight updates. + // + // Because the actual handling of the in-flight updates is the same, it's macro'ized here: + let mut pending_background_events = Vec::new(); + macro_rules! handle_in_flight_updates { + ($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr, + $monitor: expr, $peer_state: expr, $channel_info_log: expr + ) => { { + let mut max_in_flight_update_id = 0; + $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id()); + for update in $chan_in_flight_upds.iter() { + log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for {}channel {}", + update.update_id, $channel_info_log, log_bytes!($funding_txo.to_channel_id())); + max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id); + pending_background_events.push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: $counterparty_node_id, + funding_txo: $funding_txo, + update: update.clone(), + }); + } + if $chan_in_flight_upds.is_empty() { + // We had some updates to apply, but it turns out they had completed before we + // were serialized, we just weren't notified of that. Thus, we may have to run + // the completion actions for any monitor updates, but otherwise are done. + pending_background_events.push( + BackgroundEvent::MonitorUpdatesComplete { + counterparty_node_id: $counterparty_node_id, + channel_id: $funding_txo.to_channel_id(), + }); + } + if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() { + log_error!(args.logger, "Duplicate in-flight monitor update set for the same channel!"); + return Err(DecodeError::InvalidValue); + } + max_in_flight_update_id + } } + } + + for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() { + let mut peer_state_lock = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (_, chan) in peer_state.channel_by_id.iter() { + // Channels that were persisted have to be funded, otherwise they should have been + // discarded. + let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; + let monitor = args.channel_monitors.get(&funding_txo) + .expect("We already checked for monitor presence when loading channels"); + let mut max_in_flight_update_id = monitor.get_latest_update_id(); + if let Some(in_flight_upds) = &mut in_flight_monitor_updates { + if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) { + max_in_flight_update_id = cmp::max(max_in_flight_update_id, + handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds, + funding_txo, monitor, peer_state, "")); + } + } + if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id { + // If the channel is ahead of the monitor, return InvalidValue: + log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", + log_bytes!(chan.context.channel_id()), monitor.get_latest_update_id(), max_in_flight_update_id); + log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id()); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + if let Some(in_flight_upds) = in_flight_monitor_updates { + for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds { + if let Some(monitor) = args.channel_monitors.get(&funding_txo) { + // Now that we've removed all the in-flight monitor updates for channels that are + // still open, we need to replay any monitor updates that are for closed channels, + // creating the neccessary peer_state entries as we go. + let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| { + Mutex::new(peer_state_from_chans(HashMap::new())) + }); + let mut peer_state = peer_state_mutex.lock().unwrap(); + handle_in_flight_updates!(counterparty_id, chan_in_flight_updates, + funding_txo, monitor, peer_state, "closed "); + } else { + log_error!(args.logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is missing.", + log_bytes!(funding_txo.to_channel_id())); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + // Note that we have to do the above replays before we push new monitor updates. + pending_background_events.append(&mut close_background_events); + + // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we + // should ensure we try them again on the inbound edge. We put them here and do so after we + // have a fully-constructed `ChannelManager` at the end. + let mut pending_claims_to_replay = Vec::new(); + { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state @@ -8220,7 +8705,8 @@ where // We only rebuild the pending payments map if we were most recently serialized by // 0.0.102+ for (_, monitor) in args.channel_monitors.iter() { - if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() { + let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()); + if counterparty_opt.is_none() { for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() { if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source { if path.hops.is_empty() { @@ -8314,6 +8800,33 @@ where } } } + + // Whether the downstream channel was closed or not, try to re-apply any payment + // preimages from it which may be needed in upstream channels for forwarded + // payments. + let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs() + .into_iter() + .filter_map(|(htlc_source, (htlc, preimage_opt))| { + if let HTLCSource::PreviousHopData(_) = htlc_source { + if let Some(payment_preimage) = preimage_opt { + Some((htlc_source, payment_preimage, htlc.amount_msat, + // Check if `counterparty_opt.is_none()` to see if the + // downstream chan is closed (because we don't have a + // channel_id -> peer map entry). + counterparty_opt.is_none(), + monitor.get_funding_txo().0.to_channel_id())) + } else { None } + } else { + // If it was an outbound payment, we've handled it above - if a preimage + // came in and we persisted the `ChannelManager` we either handled it and + // are good to go or the channel force-closed - we don't have to handle the + // channel still live case here. + None + } + }); + for tuple in outbound_claimed_htlcs_iter { + pending_claims_to_replay.push(tuple); + } } } @@ -8501,6 +9014,12 @@ where blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates .entry(blocked_channel_outpoint.to_channel_id()) .or_insert_with(Vec::new).push(blocking_action.clone()); + } else { + // If the channel we were blocking has closed, we don't need to + // worry about it - the blocked monitor update should never have + // been released from the `Channel` object so it can't have + // completed, and if the channel closed there's no reason to bother + // anymore. } } } @@ -8546,7 +9065,6 @@ where pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), - #[cfg(debug_assertions)] background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), @@ -8565,6 +9083,14 @@ where channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } + for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay { + // We use `downstream_closed` in place of `from_onchain` here just as a guess - we + // don't remember in the `ChannelMonitor` where we got a preimage from, but if the + // channel is closed we just assume that it probably came from an on-chain claim. + channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), + downstream_closed, downstream_chan_id); + } + //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. @@ -8582,7 +9108,7 @@ mod tests { use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, RecipientOnionFields, InterceptId}; use crate::ln::functional_test_utils::*; - use crate::ln::msgs; + use crate::ln::msgs::{self, ErrorAction}; use crate::ln::msgs::ChannelMessageHandler; use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; @@ -9534,7 +10060,94 @@ mod tests { get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk); } - #[cfg(anchors)] + #[test] + fn reject_excessively_underpaying_htlcs() { + let chanmon_cfg = create_chanmon_cfgs(1); + let node_cfg = create_node_cfgs(1, &chanmon_cfg); + let node_chanmgr = create_node_chanmgrs(1, &node_cfg, &[None]); + let node = create_network(1, &node_cfg, &node_chanmgr); + let sender_intended_amt_msat = 100; + let extra_fee_msat = 10; + let hop_data = msgs::OnionHopData { + amt_to_forward: 100, + outgoing_cltv_value: 42, + format: msgs::OnionHopDataFormat::FinalNode { + keysend_preimage: None, + payment_metadata: None, + payment_data: Some(msgs::FinalOnionHopData { + payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat, + }), + } + }; + // Check that if the amount we received + the penultimate hop extra fee is less than the sender + // intended amount, we fail the payment. + if let Err(crate::ln::channelmanager::ReceiveError { err_code, .. }) = + node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), + sender_intended_amt_msat - extra_fee_msat - 1, 42, None, true, Some(extra_fee_msat)) + { + assert_eq!(err_code, 19); + } else { panic!(); } + + // If amt_received + extra_fee is equal to the sender intended amount, we're fine. + let hop_data = msgs::OnionHopData { // This is the same hop_data as above, OnionHopData doesn't implement Clone + amt_to_forward: 100, + outgoing_cltv_value: 42, + format: msgs::OnionHopDataFormat::FinalNode { + keysend_preimage: None, + payment_metadata: None, + payment_data: Some(msgs::FinalOnionHopData { + payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat, + }), + } + }; + assert!(node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), + sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat)).is_ok()); + } + + #[test] + fn test_inbound_anchors_manual_acceptance() { + // Tests that we properly limit inbound channels when we have the manual-channel-acceptance + // flag set and (sometimes) accept channels as 0conf. + let mut anchors_cfg = test_default_channel_config(); + anchors_cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; + + let mut anchors_manual_accept_cfg = anchors_cfg.clone(); + anchors_manual_accept_cfg.manually_accept_inbound_channels = true; + + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, + &[Some(anchors_cfg.clone()), Some(anchors_cfg.clone()), Some(anchors_manual_accept_cfg.clone())]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap(); + let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); + + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + match &msg_events[0] { + MessageSendEvent::HandleError { node_id, action } => { + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + match action { + ErrorAction::SendErrorMessage { msg } => + assert_eq!(msg.data, "No channels with anchor outputs accepted".to_owned()), + _ => panic!("Unexpected error action"), + } + } + _ => panic!("Unexpected event"), + } + + nodes[2].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + let events = nodes[2].node.get_and_clear_pending_events(); + match events[0] { + Event::OpenChannelRequest { temporary_channel_id, .. } => + nodes[2].node.accept_inbound_channel(&temporary_channel_id, &nodes[0].node.get_our_node_id(), 23).unwrap(), + _ => panic!("Unexpected event"), + } + get_event_msg!(nodes[2], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); + } + #[test] fn test_anchors_zero_fee_htlc_tx_fallback() { // Tests that if both nodes support anchors, but the remote node does not want to accept @@ -9624,6 +10237,25 @@ mod tests { MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("expected BroadcastChannelUpdate event"), } + + // If we provide a channel_id not associated with the peer, we should get an error and no updates + // should be applied to ensure update atomicity as specified in the API docs. + let bad_channel_id = [10; 32]; + let current_fee = nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_proportional_millionths; + let new_fee = current_fee + 100; + assert!( + matches!( + nodes[0].node.update_partial_channel_config(&channel.counterparty.node_id, &[channel.channel_id, bad_channel_id], &ChannelConfigUpdate { + forwarding_fee_proportional_millionths: Some(new_fee), + ..Default::default() + }), + Err(APIError::ChannelUnavailable { err: _ }), + ) + ); + // Check that the fee hasn't changed for the channel that exists. + assert_eq!(nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_proportional_millionths, current_fee); + let events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 0); } } @@ -9639,7 +10271,7 @@ pub mod bench { use crate::routing::gossip::NetworkGraph; use crate::routing::router::{PaymentParameters, RouteParameters}; use crate::util::test_utils; - use crate::util::config::UserConfig; + use crate::util::config::{UserConfig, MaxDustHTLCExposure}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; @@ -9677,6 +10309,7 @@ pub mod bench { // Note that this is unrealistic as each payment send will require at least two fsync // calls per node. let network = bitcoin::Network::Testnet; + let genesis_block = bitcoin::blockdata::constants::genesis_block(network); let tx_broadcaster = test_utils::TestBroadcaster::new(network); let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; @@ -9685,6 +10318,7 @@ pub mod bench { let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer); let mut config: UserConfig = Default::default(); + config.channel_config.max_dust_htlc_exposure = MaxDustHTLCExposure::FeeRateMultiplier(5_000_000 / 253); config.channel_handshake_config.minimum_depth = 1; let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a); @@ -9693,7 +10327,7 @@ pub mod bench { let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), - }); + }, genesis_block.header.time); let node_a_holder = ANodeHolder { node: &node_a }; let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); @@ -9703,7 +10337,7 @@ pub mod bench { let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), - }); + }, genesis_block.header.time); let node_b_holder = ANodeHolder { node: &node_b }; node_a.peer_connected(&node_b.get_our_node_id(), &Init {