X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=b87be9cbb75cca9bc838592145ca924eee3d6a7a;hb=72c42ee786d11ea1913d2898069f2c6b15285a2a;hp=c7c16e077a2c42026838c828ff128a68baa74a96;hpb=e186593687ee99d68773214b6b63aae0647dd327;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index c7c16e07..b87be9cb 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -317,7 +317,7 @@ impl core::hash::Hash for HTLCSource { } } impl HTLCSource { - #[cfg(not(feature = "grind_signatures"))] + #[cfg(all(feature = "_test_vectors", not(feature = "grind_signatures")))] #[cfg(test)] pub fn dummy() -> Self { HTLCSource::OutboundRoute { @@ -507,19 +507,19 @@ struct ClaimablePayments { /// running normally, and specifically must be processed before any other non-background /// [`ChannelMonitorUpdate`]s are applied. enum BackgroundEvent { - /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from - /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public - /// key to handle channel resumption, whereas if the channel has been force-closed we do not - /// need the counterparty node_id. + /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel. + /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the + /// maybe-non-closing variant needs a public key to handle channel resumption, whereas if the + /// channel has been force-closed we do not need the counterparty node_id. /// /// Note that any such events are lost on shutdown, so in general they must be updates which /// are regenerated on startup. - ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), + ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the /// channel to continue normal operation. /// /// In general this should be used rather than - /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the + /// [`Self::ClosedMonitorUpdateRegeneratedOnStartup`], however in cases where the /// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`] /// error the other variant is acceptable. /// @@ -752,7 +752,23 @@ pub type SimpleArcChannelManager = ChannelManager< /// of [`KeysManager`] and [`DefaultRouter`]. /// /// This is not exported to bindings users as Arcs don't make sense in bindings -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = + ChannelManager< + &'a M, + &'b T, + &'c KeysManager, + &'c KeysManager, + &'c KeysManager, + &'d F, + &'e DefaultRouter< + &'f NetworkGraph<&'g L>, + &'g L, + &'h Mutex, &'g L>>, + ProbabilisticScoringFeeParameters, + ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L> + >, + &'g L + >; macro_rules! define_test_pub_trait { ($vis: vis) => { /// A trivial trait which describes any [`ChannelManager`] used in testing. @@ -1098,7 +1114,6 @@ where /// Notifier the lock contains sends out a notification when the lock is released. total_consistency_lock: RwLock<()>, - #[cfg(debug_assertions)] background_events_processed_since_startup: AtomicBool, persistence_notifier: Notifier, @@ -1464,6 +1479,9 @@ pub struct ChannelDetails { /// /// [`confirmations_required`]: ChannelDetails::confirmations_required pub is_channel_ready: bool, + /// The stage of the channel's shutdown. + /// `None` for `ChannelDetails` serialized on LDK versions prior to 0.0.116. + pub channel_shutdown_state: Option, /// True if the channel is (a) confirmed and channel_ready messages have been exchanged, (b) /// the peer is connected, and (c) the channel is not currently negotiating a shutdown. /// @@ -1503,10 +1521,13 @@ impl ChannelDetails { self.short_channel_id.or(self.outbound_scid_alias) } - fn from_channel_context(context: &ChannelContext, - best_block_height: u32, latest_features: InitFeatures) -> Self { - - let balance = context.get_available_balances(); + fn from_channel_context( + context: &ChannelContext, best_block_height: u32, latest_features: InitFeatures, + fee_estimator: &LowerBoundedFeeEstimator + ) -> Self + where F::Target: FeeEstimator + { + let balance = context.get_available_balances(fee_estimator); let (to_remote_reserve_satoshis, to_self_reserve_satoshis) = context.get_holder_counterparty_selected_channel_reserve_satoshis(); ChannelDetails { @@ -1551,10 +1572,33 @@ impl ChannelDetails { inbound_htlc_minimum_msat: Some(context.get_holder_htlc_minimum_msat()), inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(), config: Some(context.config()), + channel_shutdown_state: Some(context.shutdown_state()), } } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Further information on the details of the channel shutdown. +/// Upon channels being forced closed (i.e. commitment transaction confirmation detected +/// by `ChainMonitor`), ChannelShutdownState will be set to `ShutdownComplete` or +/// the channel will be removed shortly. +/// Also note, that in normal operation, peers could disconnect at any of these states +/// and require peer re-connection before making progress onto other states +pub enum ChannelShutdownState { + /// Channel has not sent or received a shutdown message. + NotShuttingDown, + /// Local node has sent a shutdown message for this channel. + ShutdownInitiated, + /// Shutdown message exchanges have concluded and the channels are in the midst of + /// resolving all existing open HTLCs before closing can continue. + ResolvingHTLCs, + /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates. + NegotiatingClosingFee, + /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about + /// to drop the channel. + ShutdownComplete, +} + /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments. /// These include payments that have yet to find a successful path, or have unresolved HTLCs. #[derive(Debug, PartialEq)] @@ -1872,9 +1916,7 @@ macro_rules! handle_new_monitor_update { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); - #[cfg(debug_assertions)] { - debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); - } + debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); match $update_res { ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", @@ -1978,6 +2020,8 @@ macro_rules! process_events_body { let mut pending_events = $self.pending_events.lock().unwrap(); pending_events.drain(..num_events); processed_all_events = pending_events.is_empty(); + // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being + // updated here with the `pending_events` lock acquired. $self.pending_events_processor.store(false, Ordering::Release); } @@ -2007,6 +2051,8 @@ where { /// Constructs a new `ChannelManager` to hold several channels and route between them. /// + /// The current time or latest block header time can be provided as the `current_timestamp`. + /// /// This is the main "logic hub" for all channel-related actions, and implements /// [`ChannelMessageHandler`]. /// @@ -2020,7 +2066,11 @@ where /// [`block_connected`]: chain::Listen::block_connected /// [`block_disconnected`]: chain::Listen::block_disconnected /// [`params.best_block.block_hash`]: chain::BestBlock::block_hash - pub fn new(fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters) -> Self { + pub fn new( + fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, + node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters, + current_timestamp: u32, + ) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); let inbound_pmt_key_material = node_signer.get_inbound_payment_key_material(); @@ -2052,7 +2102,7 @@ where probing_cookie_secret: entropy_source.get_secure_random_bytes(), - highest_seen_timestamp: AtomicUsize::new(0), + highest_seen_timestamp: AtomicUsize::new(current_timestamp as usize), per_peer_state: FairRwLock::new(HashMap::new()), @@ -2060,7 +2110,6 @@ where pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), - #[cfg(debug_assertions)] background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), @@ -2192,7 +2241,7 @@ where let peer_state = &mut *peer_state_lock; for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } } @@ -2218,17 +2267,17 @@ where let peer_state = &mut *peer_state_lock; for (_channel_id, channel) in peer_state.channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } } @@ -2261,7 +2310,8 @@ where return peer_state.channel_by_id .iter() .map(|(_, channel)| - ChannelDetails::from_channel_context(&channel.context, best_block_height, features.clone())) + ChannelDetails::from_channel_context(&channel.context, best_block_height, + features.clone(), &self.fee_estimator)) .collect(); } vec![] @@ -3065,7 +3115,7 @@ where session_priv: session_priv.clone(), first_hop_htlc_msat: htlc_msat, payment_id, - }, onion_packet, None, &self.logger); + }, onion_packet, None, &self.fee_estimator, &self.logger); match break_chan_entry!(self, send_res, chan) { Some(monitor_update) => { match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) { @@ -3782,7 +3832,8 @@ where }); if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), - onion_packet, skimmed_fee_msat, &self.logger) + onion_packet, skimmed_fee_msat, &self.fee_estimator, + &self.logger) { if let ChannelError::Ignore(msg) = e { log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg); @@ -4097,7 +4148,6 @@ where fn process_background_events(&self) -> NotifyOption { debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread); - #[cfg(debug_assertions)] self.background_events_processed_since_startup.store(true, Ordering::Release); let mut background_events = Vec::new(); @@ -4108,7 +4158,7 @@ where for event in background_events.drain(..) { match event { - BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { + BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { // The channel has already been closed, so no use bothering to care about the // monitor updating completing. let _ = self.chain_monitor.update_channel(funding_txo, &update); @@ -4171,7 +4221,7 @@ where log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.", log_bytes!(chan_id[..]), chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - chan.queue_update_fee(new_feerate, &self.logger); + chan.queue_update_fee(new_feerate, &self.fee_estimator, &self.logger); NotifyOption::DoPersist } @@ -4688,6 +4738,11 @@ where -> Result<(), (PublicKey, MsgHandleErrInternal)> { //TODO: Delay the claimed_funds relaying just like we do outbound relay! + // If we haven't yet run background events assume we're still deserializing and shouldn't + // actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as + // `BackgroundEvent`s. + let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire); + { let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.outpoint.to_channel_id(); @@ -4714,14 +4769,26 @@ where log_bytes!(chan_id), action); peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } - let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, - peer_state, per_peer_state, chan); - if let Err(e) = res { - // TODO: This is a *critical* error - we probably updated the outbound edge - // of the HTLC's monitor with a preimage. We should retry this monitor - // update over and over again until morale improves. - log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); - return Err((counterparty_node_id, e)); + if !during_init { + let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, + peer_state, per_peer_state, chan); + if let Err(e) = res { + // TODO: This is a *critical* error - we probably updated the outbound edge + // of the HTLC's monitor with a preimage. We should retry this monitor + // update over and over again until morale improves. + log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); + return Err((counterparty_node_id, e)); + } + } else { + // If we're running during init we cannot update a monitor directly - + // they probably haven't actually been loaded yet. Instead, push the + // monitor update as a background event. + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, + funding_txo: prev_hop.outpoint, + update: monitor_update.clone(), + }); } } return Ok(()); @@ -4734,16 +4801,34 @@ where payment_preimage, }], }; - // We update the ChannelMonitor on the backward link, after - // receiving an `update_fulfill_htlc` from the forward link. - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); - if update_res != ChannelMonitorUpdateStatus::Completed { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same event and try - // again on restart. - log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, update_res); + + if !during_init { + // We update the ChannelMonitor on the backward link, after + // receiving an `update_fulfill_htlc` from the forward link. + let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); + if update_res != ChannelMonitorUpdateStatus::Completed { + // TODO: This needs to be handled somehow - if we receive a monitor update + // with a preimage we *must* somehow manage to propagate it to the upstream + // channel, or we must have an ability to receive the same event and try + // again on restart. + log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", + payment_preimage, update_res); + } + } else { + // If we're running during init we cannot update a monitor directly - they probably + // haven't actually been loaded yet. Instead, push the monitor update as a background + // event. + // Note that while it's safe to use `ClosedMonitorUpdateRegeneratedOnStartup` here (the + // channel is already closed) we need to ultimately handle the monitor update + // completion action only after we've completed the monitor update. This is the only + // way to guarantee this update *will* be regenerated on startup (otherwise if this was + // from a forwarded HTLC the downstream preimage may be deleted before we claim + // upstream). Thus, we need to transition to some new `BackgroundEvent` type which will + // complete the monitor update completion action from `completion_action`. + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(( + prev_hop.outpoint, preimage_update, + ))); } // Note that we do process the completion action here. This totally could be a // duplicate claim, but we have no way of knowing without interrogating the @@ -4761,6 +4846,8 @@ where fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_id: [u8; 32]) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { + debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire), + "We don't support claim_htlc claims during startup - monitors may not be available yet"); self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger); }, HTLCSource::PreviousHopData(hop_data) => { @@ -5144,9 +5231,13 @@ where return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision for the same peer!".to_owned(), msg.temporary_channel_id.clone())) } else { if !self.default_configuration.manually_accept_inbound_channels { - if channel.context.get_channel_type().requires_zero_conf() { + let channel_type = channel.context.get_channel_type(); + if channel_type.requires_zero_conf() { return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone())); } + if channel_type.requires_anchors_zero_fee_htlc_tx() { + return Err(MsgHandleErrInternal::send_err_msg_no_close("No channels with anchor outputs accepted".to_owned(), msg.temporary_channel_id.clone())); + } peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { node_id: counterparty_node_id.clone(), msg: channel.accept_inbound_channel(user_channel_id), @@ -5503,7 +5594,7 @@ where _ => pending_forward_info } }; - try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan); + try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5676,22 +5767,27 @@ where } } - // We only want to push a PendingHTLCsForwardable event if no others are queued. fn push_pending_forwards_ev(&self) { let mut pending_events = self.pending_events.lock().unwrap(); - let forward_ev_exists = pending_events.iter() - .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) - .is_some(); - if !forward_ev_exists { - pending_events.push_back((events::Event::PendingHTLCsForwardable { - time_forwardable: - Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), + let is_processing_events = self.pending_events_processor.load(Ordering::Acquire); + let num_forward_events = pending_events.iter().filter(|(ev, _)| + if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false } + ).count(); + // We only want to push a PendingHTLCsForwardable event if no others are queued. Processing + // events is done in batches and they are not removed until we're done processing each + // batch. Since handling a `PendingHTLCsForwardable` event will call back into the + // `ChannelManager`, we'll still see the original forwarding event not removed. Phantom + // payments will need an additional forwarding event before being claimed to make them look + // real by taking more time. + if (is_processing_events && num_forward_events <= 1) || num_forward_events < 1 { + pending_events.push_back((Event::PendingHTLCsForwardable { + time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), }, None)); } } /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote - /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event + /// [`msgs::RevokeAndACK`] should be held for the given channel until some other action /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of /// the [`ChannelMonitorUpdate`] in question. fn raa_monitor_updates_held(&self, @@ -5720,7 +5816,7 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().context.get_funding_txo(); - let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan); let res = if let Some(monitor_update) = monitor_update_opt { handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) @@ -5991,7 +6087,7 @@ where let counterparty_node_id = chan.context.get_counterparty_node_id(); let funding_txo = chan.context.get_funding_txo(); let (monitor_opt, holding_cell_failed_htlcs) = - chan.maybe_free_holding_cell_htlcs(&self.logger); + chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &self.logger); if !holding_cell_failed_htlcs.is_empty() { failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id)); } @@ -6275,7 +6371,7 @@ where inflight_htlcs } - #[cfg(any(test, fuzzing, feature = "_test_utils"))] + #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { let events = core::cell::RefCell::new(Vec::new()); let event_handler = |event: events::Event| events.borrow_mut().push(event); @@ -6308,7 +6404,7 @@ where /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an /// [`Event`] being handled) completes, this should be called to restore the channel to normal /// operation. It will double-check that nothing *else* is also blocking the same channel from - /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly. + /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly. fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option) { let mut errors = Vec::new(); loop { @@ -7265,7 +7361,7 @@ pub(crate) fn provided_channel_type_features(config: &UserConfig) -> ChannelType /// Fetches the set of [`InitFeatures`] flags which are provided by or required by /// [`ChannelManager`]. -pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { +pub fn provided_init_features(config: &UserConfig) -> InitFeatures { // Note that if new features are added here which other peers may (eventually) require, we // should also add the corresponding (optional) bit to the [`ChannelMessageHandler`] impl for // [`ErroringMessageHandler`]. @@ -7281,11 +7377,8 @@ pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { features.set_channel_type_optional(); features.set_scid_privacy_optional(); features.set_zero_conf_optional(); - #[cfg(anchors)] - { // Attributes are not allowed on if expressions on our current MSRV of 1.41. - if _config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { - features.set_anchors_zero_fee_htlc_tx_optional(); - } + if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { + features.set_anchors_zero_fee_htlc_tx_optional(); } features } @@ -7342,6 +7435,7 @@ impl Writeable for ChannelDetails { (35, self.inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, self.feerate_sat_per_1000_weight, option), + (41, self.channel_shutdown_state, option), }); Ok(()) } @@ -7379,6 +7473,7 @@ impl Readable for ChannelDetails { (35, inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, feerate_sat_per_1000_weight, option), + (41, channel_shutdown_state, option), }); // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with @@ -7414,6 +7509,7 @@ impl Readable for ChannelDetails { inbound_htlc_minimum_msat, inbound_htlc_maximum_msat, feerate_sat_per_1000_weight, + channel_shutdown_state, }) } } @@ -7964,6 +8060,14 @@ impl Readable for VecDeque<(Event, Option)> { } } +impl_writeable_tlv_based_enum!(ChannelShutdownState, + (0, NotShuttingDown) => {}, + (2, ShutdownInitiated) => {}, + (4, ResolvingHTLCs) => {}, + (6, NegotiatingClosingFee) => {}, + (8, ShutdownComplete) => {}, ; +); + /// Arguments for the creation of a ChannelManager that are not deserialized. /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation @@ -8229,7 +8333,7 @@ where update_id: CLOSED_CHANNEL_UPDATE_ID, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }], }; - close_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); + close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); } } @@ -8441,12 +8545,12 @@ where funding_txo, monitor, peer_state, "")); } } - if chan.get_latest_complete_monitor_update_id() > max_in_flight_update_id { + if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id { // If the channel is ahead of the monitor, return InvalidValue: log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", log_bytes!(chan.context.channel_id()), monitor.get_latest_update_id(), max_in_flight_update_id); - log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_complete_monitor_update_id()); + log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id()); log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); @@ -8484,6 +8588,11 @@ where // Note that we have to do the above replays before we push new monitor updates. pending_background_events.append(&mut close_background_events); + // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we + // should ensure we try them again on the inbound edge. We put them here and do so after we + // have a fully-constructed `ChannelManager` at the end. + let mut pending_claims_to_replay = Vec::new(); + { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state @@ -8494,7 +8603,8 @@ where // We only rebuild the pending payments map if we were most recently serialized by // 0.0.102+ for (_, monitor) in args.channel_monitors.iter() { - if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() { + let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()); + if counterparty_opt.is_none() { for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() { if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source { if path.hops.is_empty() { @@ -8588,6 +8698,33 @@ where } } } + + // Whether the downstream channel was closed or not, try to re-apply any payment + // preimages from it which may be needed in upstream channels for forwarded + // payments. + let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs() + .into_iter() + .filter_map(|(htlc_source, (htlc, preimage_opt))| { + if let HTLCSource::PreviousHopData(_) = htlc_source { + if let Some(payment_preimage) = preimage_opt { + Some((htlc_source, payment_preimage, htlc.amount_msat, + // Check if `counterparty_opt.is_none()` to see if the + // downstream chan is closed (because we don't have a + // channel_id -> peer map entry). + counterparty_opt.is_none(), + monitor.get_funding_txo().0.to_channel_id())) + } else { None } + } else { + // If it was an outbound payment, we've handled it above - if a preimage + // came in and we persisted the `ChannelManager` we either handled it and + // are good to go or the channel force-closed - we don't have to handle the + // channel still live case here. + None + } + }); + for tuple in outbound_claimed_htlcs_iter { + pending_claims_to_replay.push(tuple); + } } } @@ -8820,7 +8957,6 @@ where pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), - #[cfg(debug_assertions)] background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), @@ -8839,6 +8975,14 @@ where channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } + for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay { + // We use `downstream_closed` in place of `from_onchain` here just as a guess - we + // don't remember in the `ChannelMonitor` where we got a preimage from, but if the + // channel is closed we just assume that it probably came from an on-chain claim. + channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), + downstream_closed, downstream_chan_id); + } + //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. @@ -8856,7 +9000,7 @@ mod tests { use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, RecipientOnionFields, InterceptId}; use crate::ln::functional_test_utils::*; - use crate::ln::msgs; + use crate::ln::msgs::{self, ErrorAction}; use crate::ln::msgs::ChannelMessageHandler; use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; @@ -9852,7 +9996,50 @@ mod tests { sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat)).is_ok()); } - #[cfg(anchors)] + #[test] + fn test_inbound_anchors_manual_acceptance() { + // Tests that we properly limit inbound channels when we have the manual-channel-acceptance + // flag set and (sometimes) accept channels as 0conf. + let mut anchors_cfg = test_default_channel_config(); + anchors_cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; + + let mut anchors_manual_accept_cfg = anchors_cfg.clone(); + anchors_manual_accept_cfg.manually_accept_inbound_channels = true; + + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, + &[Some(anchors_cfg.clone()), Some(anchors_cfg.clone()), Some(anchors_manual_accept_cfg.clone())]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap(); + let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); + + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + match &msg_events[0] { + MessageSendEvent::HandleError { node_id, action } => { + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + match action { + ErrorAction::SendErrorMessage { msg } => + assert_eq!(msg.data, "No channels with anchor outputs accepted".to_owned()), + _ => panic!("Unexpected error action"), + } + } + _ => panic!("Unexpected event"), + } + + nodes[2].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + let events = nodes[2].node.get_and_clear_pending_events(); + match events[0] { + Event::OpenChannelRequest { temporary_channel_id, .. } => + nodes[2].node.accept_inbound_channel(&temporary_channel_id, &nodes[0].node.get_our_node_id(), 23).unwrap(), + _ => panic!("Unexpected event"), + } + get_event_msg!(nodes[2], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); + } + #[test] fn test_anchors_zero_fee_htlc_tx_fallback() { // Tests that if both nodes support anchors, but the remote node does not want to accept @@ -9957,7 +10144,7 @@ pub mod bench { use crate::routing::gossip::NetworkGraph; use crate::routing::router::{PaymentParameters, RouteParameters}; use crate::util::test_utils; - use crate::util::config::UserConfig; + use crate::util::config::{UserConfig, MaxDustHTLCExposure}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; @@ -9995,6 +10182,7 @@ pub mod bench { // Note that this is unrealistic as each payment send will require at least two fsync // calls per node. let network = bitcoin::Network::Testnet; + let genesis_block = bitcoin::blockdata::constants::genesis_block(network); let tx_broadcaster = test_utils::TestBroadcaster::new(network); let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; @@ -10003,6 +10191,7 @@ pub mod bench { let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer); let mut config: UserConfig = Default::default(); + config.channel_config.max_dust_htlc_exposure = MaxDustHTLCExposure::FeeRateMultiplier(5_000_000 / 253); config.channel_handshake_config.minimum_depth = 1; let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a); @@ -10011,7 +10200,7 @@ pub mod bench { let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), - }); + }, genesis_block.header.time); let node_a_holder = ANodeHolder { node: &node_a }; let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); @@ -10021,7 +10210,7 @@ pub mod bench { let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), - }); + }, genesis_block.header.time); let node_b_holder = ANodeHolder { node: &node_b }; node_a.peer_connected(&node_b.get_our_node_id(), &Init {