X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=c7fc70a4593d66601b7d0d0e2acf8759196e5ffd;hb=c383f06538ac664fe3312daf765595ba106d5b98;hp=fab6a2d1340289be0a1fa8d5c7c8752a25ca0bbb;hpb=82e0df5e4dcc41c2d32ded6ff433727b15570ea4;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index fab6a2d1..c7fc70a4 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -40,12 +40,12 @@ use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, Messa // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret}; -use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel}; +use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UnfundedChannelContext, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel}; use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures}; #[cfg(any(feature = "_test_utils", test))] -use crate::ln::features::InvoiceFeatures; +use crate::ln::features::Bolt11InvoiceFeatures; use crate::routing::gossip::NetworkGraph; -use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router}; +use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteParameters, Router}; use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use crate::ln::msgs; use crate::ln::onion_utils; @@ -53,7 +53,7 @@ use crate::ln::onion_utils::HTLCFailReason; use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; #[cfg(test)] use crate::ln::outbound_payment; -use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment}; +use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment, SendAlongPathArgs}; use crate::ln::wire::Encode; use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner, WriteableEcdsaChannelSigner}; use crate::util::config::{UserConfig, ChannelConfig, ChannelConfigUpdate}; @@ -317,7 +317,7 @@ impl core::hash::Hash for HTLCSource { } } impl HTLCSource { - #[cfg(not(feature = "grind_signatures"))] + #[cfg(all(feature = "_test_vectors", not(feature = "grind_signatures")))] #[cfg(test)] pub fn dummy() -> Self { HTLCSource::OutboundRoute { @@ -507,19 +507,19 @@ struct ClaimablePayments { /// running normally, and specifically must be processed before any other non-background /// [`ChannelMonitorUpdate`]s are applied. enum BackgroundEvent { - /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from - /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public - /// key to handle channel resumption, whereas if the channel has been force-closed we do not - /// need the counterparty node_id. + /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel. + /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the + /// maybe-non-closing variant needs a public key to handle channel resumption, whereas if the + /// channel has been force-closed we do not need the counterparty node_id. /// /// Note that any such events are lost on shutdown, so in general they must be updates which /// are regenerated on startup. - ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), + ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the /// channel to continue normal operation. /// /// In general this should be used rather than - /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the + /// [`Self::ClosedMonitorUpdateRegeneratedOnStartup`], however in cases where the /// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`] /// error the other variant is acceptable. /// @@ -530,6 +530,13 @@ enum BackgroundEvent { funding_txo: OutPoint, update: ChannelMonitorUpdate }, + /// Some [`ChannelMonitorUpdate`] (s) completed before we were serialized but we still have + /// them marked pending, thus we need to run any [`MonitorUpdateCompletionAction`] (s) pending + /// on a channel. + MonitorUpdatesComplete { + counterparty_node_id: PublicKey, + channel_id: [u8; 32], + }, } #[derive(Debug)] @@ -633,6 +640,13 @@ pub(super) struct PeerState { /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the + /// user but which have not yet completed. + /// + /// Note that the channel may no longer exist. For example if the channel was closed but we + /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update + /// for a missing channel. + in_flight_monitor_updates: BTreeMap>, /// Map from a specific channel to some action(s) that should be taken when all pending /// [`ChannelMonitorUpdate`]s for the channel complete updating. /// @@ -668,9 +682,10 @@ impl PeerState { return false } self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty() + && self.in_flight_monitor_updates.is_empty() } - // Returns a count of all channels we have with this peer, including pending channels. + // Returns a count of all channels we have with this peer, including unfunded channels. fn total_channel_count(&self) -> usize { self.channel_by_id.len() + self.outbound_v1_channel_by_id.len() + @@ -744,7 +759,23 @@ pub type SimpleArcChannelManager = ChannelManager< /// of [`KeysManager`] and [`DefaultRouter`]. /// /// This is not exported to bindings users as Arcs don't make sense in bindings -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = + ChannelManager< + &'a M, + &'b T, + &'c KeysManager, + &'c KeysManager, + &'c KeysManager, + &'d F, + &'e DefaultRouter< + &'f NetworkGraph<&'g L>, + &'g L, + &'h Mutex, &'g L>>, + ProbabilisticScoringFeeParameters, + ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L> + >, + &'g L + >; macro_rules! define_test_pub_trait { ($vis: vis) => { /// A trivial trait which describes any [`ChannelManager`] used in testing. @@ -1090,7 +1121,6 @@ where /// Notifier the lock contains sends out a notification when the lock is released. total_consistency_lock: RwLock<()>, - #[cfg(debug_assertions)] background_events_processed_since_startup: AtomicBool, persistence_notifier: Notifier, @@ -1456,6 +1486,9 @@ pub struct ChannelDetails { /// /// [`confirmations_required`]: ChannelDetails::confirmations_required pub is_channel_ready: bool, + /// The stage of the channel's shutdown. + /// `None` for `ChannelDetails` serialized on LDK versions prior to 0.0.116. + pub channel_shutdown_state: Option, /// True if the channel is (a) confirmed and channel_ready messages have been exchanged, (b) /// the peer is connected, and (c) the channel is not currently negotiating a shutdown. /// @@ -1495,10 +1528,13 @@ impl ChannelDetails { self.short_channel_id.or(self.outbound_scid_alias) } - fn from_channel_context(context: &ChannelContext, - best_block_height: u32, latest_features: InitFeatures) -> Self { - - let balance = context.get_available_balances(); + fn from_channel_context( + context: &ChannelContext, best_block_height: u32, latest_features: InitFeatures, + fee_estimator: &LowerBoundedFeeEstimator + ) -> Self + where F::Target: FeeEstimator + { + let balance = context.get_available_balances(fee_estimator); let (to_remote_reserve_satoshis, to_self_reserve_satoshis) = context.get_holder_counterparty_selected_channel_reserve_satoshis(); ChannelDetails { @@ -1543,10 +1579,33 @@ impl ChannelDetails { inbound_htlc_minimum_msat: Some(context.get_holder_htlc_minimum_msat()), inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(), config: Some(context.config()), + channel_shutdown_state: Some(context.shutdown_state()), } } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Further information on the details of the channel shutdown. +/// Upon channels being forced closed (i.e. commitment transaction confirmation detected +/// by `ChainMonitor`), ChannelShutdownState will be set to `ShutdownComplete` or +/// the channel will be removed shortly. +/// Also note, that in normal operation, peers could disconnect at any of these states +/// and require peer re-connection before making progress onto other states +pub enum ChannelShutdownState { + /// Channel has not sent or received a shutdown message. + NotShuttingDown, + /// Local node has sent a shutdown message for this channel. + ShutdownInitiated, + /// Shutdown message exchanges have concluded and the channels are in the midst of + /// resolving all existing open HTLCs before closing can continue. + ResolvingHTLCs, + /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates. + NegotiatingClosingFee, + /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about + /// to drop the channel. + ShutdownComplete, +} + /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments. /// These include payments that have yet to find a successful path, or have unresolved HTLCs. #[derive(Debug, PartialEq)] @@ -1690,12 +1749,12 @@ macro_rules! convert_chan_err { }, } }; - ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, PREFUNDED) => { + ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, UNFUNDED) => { match $err { - // We should only ever have `ChannelError::Close` when prefunded channels error. + // We should only ever have `ChannelError::Close` when unfunded channels error. // In any case, just close the channel. ChannelError::Warn(msg) | ChannelError::Ignore(msg) | ChannelError::Close(msg) => { - log_error!($self.logger, "Closing prefunded channel {} due to an error: {}", log_bytes!($channel_id[..]), msg); + log_error!($self.logger, "Closing unfunded channel {} due to an error: {}", log_bytes!($channel_id[..]), msg); update_maps_on_chan_removal!($self, &$channel_context); let shutdown_res = $channel_context.force_shutdown(false); (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel_context.get_user_id(), @@ -1725,7 +1784,7 @@ macro_rules! try_v1_outbound_chan_entry { match $res { Ok(res) => res, Err(e) => { - let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), PREFUNDED); + let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), UNFUNDED); if drop { $entry.remove_entry(); } @@ -1811,7 +1870,7 @@ macro_rules! emit_channel_ready_event { } macro_rules! handle_monitor_update_completion { - ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { + ($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { let mut updates = $chan.monitor_updating_restored(&$self.logger, &$self.node_signer, $self.genesis_hash, &$self.default_configuration, $self.best_block.read().unwrap().height()); @@ -1860,24 +1919,22 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); - #[cfg(debug_assertions)] { - debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); - } + debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); match $update_res { ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", log_bytes!($chan.context.channel_id()[..])); - Ok(()) + Ok(false) }, ChannelMonitorUpdateStatus::PermanentFailure => { log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan.context.channel_id()[..])); update_maps_on_chan_removal!($self, &$chan.context); - let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown( + let res = Err(MsgHandleErrInternal::from_finish_shutdown( "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(), $chan.context.get_user_id(), $chan.context.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok())); @@ -1885,16 +1942,42 @@ macro_rules! handle_new_monitor_update { res }, ChannelMonitorUpdateStatus::Completed => { - $chan.complete_one_mon_update($update_id); - if $chan.no_monitor_updates_pending() { - handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); - } - Ok(()) + $completed; + Ok(true) }, } } }; - ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { - handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) + }; + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry()) + }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) + .or_insert_with(Vec::new); + // During startup, we push monitor updates as background events through to here in + // order to replay updates that were in-flight when we shut down. Thus, we have to + // filter for uniqueness here. + let idx = in_flight_updates.iter().position(|upd| upd == &$update) + .unwrap_or_else(|| { + in_flight_updates.push($update); + in_flight_updates.len() - 1 + }); + let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]); + handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, $chan, _internal, $remove, + { + let _ = in_flight_updates.remove(idx); + if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { + handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); + } + }) + } }; + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) } } @@ -1944,6 +2027,8 @@ macro_rules! process_events_body { let mut pending_events = $self.pending_events.lock().unwrap(); pending_events.drain(..num_events); processed_all_events = pending_events.is_empty(); + // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being + // updated here with the `pending_events` lock acquired. $self.pending_events_processor.store(false, Ordering::Release); } @@ -2032,7 +2117,6 @@ where pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), - #[cfg(debug_assertions)] background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), @@ -2162,9 +2246,10 @@ where for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + // Only `Channels` in the channel_by_id map can be considered funded. for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } } @@ -2190,17 +2275,17 @@ where let peer_state = &mut *peer_state_lock; for (_channel_id, channel) in peer_state.channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() { let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone()); + peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } } @@ -2230,10 +2315,15 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let features = &peer_state.latest_features; + let chan_context_to_details = |context| { + ChannelDetails::from_channel_context(context, best_block_height, features.clone(), &self.fee_estimator) + }; return peer_state.channel_by_id .iter() - .map(|(_, channel)| - ChannelDetails::from_channel_context(&channel.context, best_block_height, features.clone())) + .map(|(_, channel)| &channel.context) + .chain(peer_state.outbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context)) + .chain(peer_state.inbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context)) + .map(chan_context_to_details) .collect(); } vec![] @@ -2290,49 +2380,58 @@ where let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>; let result: Result<(), _> = loop { - let per_peer_state = self.per_peer_state.read().unwrap(); + { + let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex = per_peer_state.get(counterparty_node_id) - .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(channel_id.clone()) { - hash_map::Entry::Occupied(mut chan_entry) => { - let funding_txo_opt = chan_entry.get().context.get_funding_txo(); - let their_features = &peer_state.latest_features; - let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut() - .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; - failed_htlcs = htlcs; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; - // We can send the `shutdown` message before updating the `ChannelMonitor` - // here as we don't need the monitor update to complete until we send a - // `shutdown_signed`, which we'll delay if we're pending a monitor update. - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: *counterparty_node_id, - msg: shutdown_msg, - }); + match peer_state.channel_by_id.entry(channel_id.clone()) { + hash_map::Entry::Occupied(mut chan_entry) => { + let funding_txo_opt = chan_entry.get().context.get_funding_txo(); + let their_features = &peer_state.latest_features; + let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut() + .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; + failed_htlcs = htlcs; - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update_opt.take() { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); - } + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: *counterparty_node_id, + msg: shutdown_msg, + }); - if chan_entry.get().is_shutdown() { - let channel = remove_channel!(self, chan_entry); - if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: channel_update - }); + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt.take() { + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); } - self.issue_channel_close_events(&channel.context, ClosureReason::HolderForceClosed); - } - break Ok(()); - }, - hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), counterparty_node_id) }) + + if chan_entry.get().is_shutdown() { + let channel = remove_channel!(self, chan_entry); + if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: channel_update + }); + } + self.issue_channel_close_events(&channel.context, ClosureReason::HolderForceClosed); + } + break Ok(()); + }, + hash_map::Entry::Vacant(_) => (), + } } + // If we reach this point, it means that the channel_id either refers to an unfunded channel or + // it does not exist for this peer. Either way, we can attempt to force-close it. + // + // An appropriate error will be returned for non-existence of the channel if that's the case. + return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ()) + // TODO(dunxen): This is still not ideal as we're doing some extra lookups. + // Fix this with https://github.com/lightningdevkit/rust-lightning/issues/2422 }; for htlc_source in failed_htlcs.drain(..) { @@ -2451,14 +2550,14 @@ where self.issue_channel_close_events(&chan.get().context, closure_reason); let mut chan = remove_channel!(self, chan); self.finish_force_close_channel(chan.context.force_shutdown(false)); - // Prefunded channel has no update + // Unfunded channel has no update (None, chan.context.get_counterparty_node_id()) } else if let hash_map::Entry::Occupied(chan) = peer_state.inbound_v1_channel_by_id.entry(channel_id.clone()) { log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); self.issue_channel_close_events(&chan.get().context, closure_reason); let mut chan = remove_channel!(self, chan); self.finish_force_close_channel(chan.context.force_shutdown(false)); - // Prefunded channel has no update + // Unfunded channel has no update (None, chan.context.get_counterparty_node_id()) } else { return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), peer_node_id) }); @@ -2998,10 +3097,17 @@ where #[cfg(test)] pub(crate) fn test_send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { let _lck = self.total_consistency_lock.read().unwrap(); - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv_bytes) + self.send_payment_along_path(SendAlongPathArgs { + path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, + session_priv_bytes + }) } - fn send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { + fn send_payment_along_path(&self, args: SendAlongPathArgs) -> Result<(), APIError> { + let SendAlongPathArgs { + path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, + session_priv_bytes + } = args; // The top-level caller should hold the total_consistency_lock read lock. debug_assert!(self.total_consistency_lock.try_write().is_err()); @@ -3038,22 +3144,21 @@ where session_priv: session_priv.clone(), first_hop_htlc_msat: htlc_msat, payment_id, - }, onion_packet, None, &self.logger); + }, onion_packet, None, &self.fee_estimator, &self.logger); match break_chan_entry!(self, send_res, chan) { Some(monitor_update) => { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update); - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) { - break Err(e); - } - if update_res == ChannelMonitorUpdateStatus::InProgress { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); + match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) { + Err(e) => break Err(e), + Ok(false) => { + // Note that MonitorUpdateInProgress here indicates (per function + // docs) that we will resend the commitment update once monitor + // updating completes. Therefore, we must return an error + // indicating that it is unsafe to retry the payment wholesale, + // which we do in the send_payment check for + // MonitorUpdateInProgress, below. + return Err(APIError::MonitorUpdateInProgress); + }, + Ok(true) => {}, } }, None => { }, @@ -3122,6 +3227,7 @@ where /// irrevocably committed to on our end. In such a case, do NOT retry the payment with a /// different route unless you intend to pay twice! /// + /// [`RouteHop`]: crate::routing::router::RouteHop /// [`Event::PaymentSent`]: events::Event::PaymentSent /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`UpdateHTLCs`]: events::MessageSendEvent::UpdateHTLCs @@ -3131,9 +3237,9 @@ where let best_block_height = self.best_block.read().unwrap().height(); let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments - .send_payment_with_route(route, payment_hash, recipient_onion, payment_id, &self.entropy_source, &self.node_signer, best_block_height, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + .send_payment_with_route(route, payment_hash, recipient_onion, payment_id, + &self.entropy_source, &self.node_signer, best_block_height, + |args| self.send_payment_along_path(args)) } /// Similar to [`ChannelManager::send_payment_with_route`], but will automatically find a route based on @@ -3145,18 +3251,16 @@ where .send_payment(payment_hash, recipient_onion, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.logger, - &self.pending_events, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + &self.pending_events, |args| self.send_payment_along_path(args)) } #[cfg(test)] pub(super) fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, keysend_preimage: Option, payment_id: PaymentId, recv_value_msat: Option, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, best_block_height, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion, + keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, + best_block_height, |args| self.send_payment_along_path(args)) } #[cfg(test)] @@ -3210,9 +3314,7 @@ where let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_spontaneous_payment_with_route( route, payment_preimage, recipient_onion, payment_id, &self.entropy_source, - &self.node_signer, best_block_height, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + &self.node_signer, best_block_height, |args| self.send_payment_along_path(args)) } /// Similar to [`ChannelManager::send_spontaneous_payment`], but will automatically find a route @@ -3228,9 +3330,7 @@ where self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, recipient_onion, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, - &self.logger, &self.pending_events, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + &self.logger, &self.pending_events, |args| self.send_payment_along_path(args)) } /// Send a payment that is probing the given route for liquidity. We calculate the @@ -3239,9 +3339,9 @@ where pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) + self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, + &self.entropy_source, &self.node_signer, best_block_height, + |args| self.send_payment_along_path(args)) } /// Returns whether a payment with the given [`PaymentHash`] and [`PaymentId`] is, in fact, a @@ -3266,7 +3366,7 @@ where Some(chan) => { let funding_txo = find_funding_output(&chan, &funding_transaction)?; - let funding_res = chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger) + let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger) .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e { let channel_id = chan.context.channel_id(); let user_id = chan.context.get_user_id(); @@ -3439,27 +3539,48 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; for channel_id in channel_ids { - if !peer_state.channel_by_id.contains_key(channel_id) { + if !peer_state.has_channel(channel_id) { return Err(APIError::ChannelUnavailable { err: format!("Channel with ID {} was not found for the passed counterparty_node_id {}", log_bytes!(*channel_id), counterparty_node_id), }); - } + }; } for channel_id in channel_ids { - let channel = peer_state.channel_by_id.get_mut(channel_id).unwrap(); - let mut config = channel.context.config(); - config.apply(config_update); - if !channel.context.update_config(&config) { + if let Some(channel) = peer_state.channel_by_id.get_mut(channel_id) { + let mut config = channel.context.config(); + config.apply(config_update); + if !channel.context.update_config(&config) { + continue; + } + if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); + } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { + node_id: channel.context.get_counterparty_node_id(), + msg, + }); + } continue; } - if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); - } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { - node_id: channel.context.get_counterparty_node_id(), - msg, + + let context = if let Some(channel) = peer_state.inbound_v1_channel_by_id.get_mut(channel_id) { + &mut channel.context + } else if let Some(channel) = peer_state.outbound_v1_channel_by_id.get_mut(channel_id) { + &mut channel.context + } else { + // This should not be reachable as we've already checked for non-existence in the previous channel_id loop. + debug_assert!(false); + return Err(APIError::ChannelUnavailable { + err: format!( + "Channel with ID {} for passed counterparty_node_id {} disappeared after we confirmed its existence - this should not be reachable!", + log_bytes!(*channel_id), counterparty_node_id), }); - } + }; + let mut config = context.config(); + config.apply(config_update); + // We update the config, but we MUST NOT broadcast a `channel_update` before `channel_ready` + // which would be the case for pending inbound/outbound channels. + context.update_config(&config); } Ok(()) } @@ -3756,7 +3877,8 @@ where }); if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), - onion_packet, skimmed_fee_msat, &self.logger) + onion_packet, skimmed_fee_msat, &self.fee_estimator, + &self.logger) { if let ChannelError::Ignore(msg) = e { log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg); @@ -4045,9 +4167,7 @@ where let best_block_height = self.best_block.read().unwrap().height(); self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, - &self.pending_events, &self.logger, - |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)); + &self.pending_events, &self.logger, |args| self.send_payment_along_path(args)); for (htlc_source, payment_hash, failure_reason, destination) in failed_forwards.drain(..) { self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination); @@ -4071,7 +4191,6 @@ where fn process_background_events(&self) -> NotifyOption { debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread); - #[cfg(debug_assertions)] self.background_events_processed_since_startup.store(true, Ordering::Release); let mut background_events = Vec::new(); @@ -4082,14 +4201,13 @@ where for event in background_events.drain(..) { match event { - BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { + BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { // The channel has already been closed, so no use bothering to care about the // monitor updating completing. let _ = self.chain_monitor.update_channel(funding_txo, &update); }, BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => { - let update_res = self.chain_monitor.update_channel(funding_txo, &update); - + let mut updated_chan = false; let res = { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { @@ -4097,12 +4215,18 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { hash_map::Entry::Occupied(mut chan) => { - handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan) + updated_chan = true; + handle_new_monitor_update!(self, funding_txo, update.clone(), + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) }, hash_map::Entry::Vacant(_) => Ok(()), } } else { Ok(()) } }; + if !updated_chan { + // TODO: Track this as in-flight even though the channel is closed. + let _ = self.chain_monitor.update_channel(funding_txo, &update); + } // TODO: If this channel has since closed, we're likely providing a payment // preimage update, which we must ensure is durable! We currently don't, // however, ensure that. @@ -4112,6 +4236,22 @@ where } let _ = handle_error!(self, res, counterparty_node_id); }, + BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if let Some(chan) = peer_state.channel_by_id.get_mut(&channel_id) { + handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, chan); + } else { + let update_actions = peer_state.monitor_update_blocked_actions + .remove(&channel_id).unwrap_or(Vec::new()); + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(update_actions); + } + } + }, } } NotifyOption::DoPersist @@ -4140,7 +4280,7 @@ where log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.", log_bytes!(chan_id[..]), chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - chan.queue_update_fee(new_feerate, &self.logger); + chan.queue_update_fee(new_feerate, &self.fee_estimator, &self.logger); NotifyOption::DoPersist } @@ -4153,13 +4293,19 @@ where PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { let mut should_persist = self.process_background_events(); - let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); + let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); + let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; for (chan_id, chan) in peer_state.channel_by_id.iter_mut() { + let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() { + min_mempool_feerate + } else { + normal_feerate + }; let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } } @@ -4179,6 +4325,7 @@ where /// * Expiring a channel's previous [`ChannelConfig`] if necessary to only allow forwarding HTLCs /// with the current [`ChannelConfig`]. /// * Removing peers which have disconnected but and no longer have any channels. + /// * Force-closing and removing channels which have not completed establishment in a timely manner. /// /// Note that this may cause reentrancy through [`chain::Watch::update_channel`] calls or feerate /// estimate fetches. @@ -4189,7 +4336,8 @@ where PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { let mut should_persist = self.process_background_events(); - let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); + let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); + let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new(); let mut timed_out_mpp_htlcs = Vec::new(); @@ -4202,6 +4350,11 @@ where let pending_msg_events = &mut peer_state.pending_msg_events; let counterparty_node_id = *counterparty_node_id; peer_state.channel_by_id.retain(|chan_id, chan| { + let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() { + min_mempool_feerate + } else { + normal_feerate + }; let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } @@ -4267,6 +4420,26 @@ where true }); + + let process_unfunded_channel_tick = | + chan_id: &[u8; 32], + chan_context: &mut ChannelContext<::Signer>, + unfunded_chan_context: &mut UnfundedChannelContext, + | { + chan_context.maybe_expire_prev_config(); + if unfunded_chan_context.should_expire_unfunded_channel() { + log_error!(self.logger, "Force-closing pending outbound channel {} for not establishing in a timely manner", log_bytes!(&chan_id[..])); + update_maps_on_chan_removal!(self, &chan_context); + self.issue_channel_close_events(&chan_context, ClosureReason::HolderForceClosed); + self.finish_force_close_channel(chan_context.force_shutdown(false)); + false + } else { + true + } + }; + peer_state.outbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context)); + peer_state.inbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context)); + if peer_state.ok_to_remove(true) { pending_peers_awaiting_removal.push(counterparty_node_id); } @@ -4657,6 +4830,11 @@ where -> Result<(), (PublicKey, MsgHandleErrInternal)> { //TODO: Delay the claimed_funds relaying just like we do outbound relay! + // If we haven't yet run background events assume we're still deserializing and shouldn't + // actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as + // `BackgroundEvent`s. + let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire); + { let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.outpoint.to_channel_id(); @@ -4683,16 +4861,26 @@ where log_bytes!(chan_id), action); peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update); - let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan); - if let Err(e) = res { - // TODO: This is a *critical* error - we probably updated the outbound edge - // of the HTLC's monitor with a preimage. We should retry this monitor - // update over and over again until morale improves. - log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); - return Err((counterparty_node_id, e)); + if !during_init { + let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, + peer_state, per_peer_state, chan); + if let Err(e) = res { + // TODO: This is a *critical* error - we probably updated the outbound edge + // of the HTLC's monitor with a preimage. We should retry this monitor + // update over and over again until morale improves. + log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); + return Err((counterparty_node_id, e)); + } + } else { + // If we're running during init we cannot update a monitor directly - + // they probably haven't actually been loaded yet. Instead, push the + // monitor update as a background event. + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, + funding_txo: prev_hop.outpoint, + update: monitor_update.clone(), + }); } } return Ok(()); @@ -4705,16 +4893,34 @@ where payment_preimage, }], }; - // We update the ChannelMonitor on the backward link, after - // receiving an `update_fulfill_htlc` from the forward link. - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); - if update_res != ChannelMonitorUpdateStatus::Completed { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same event and try - // again on restart. - log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, update_res); + + if !during_init { + // We update the ChannelMonitor on the backward link, after + // receiving an `update_fulfill_htlc` from the forward link. + let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); + if update_res != ChannelMonitorUpdateStatus::Completed { + // TODO: This needs to be handled somehow - if we receive a monitor update + // with a preimage we *must* somehow manage to propagate it to the upstream + // channel, or we must have an ability to receive the same event and try + // again on restart. + log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", + payment_preimage, update_res); + } + } else { + // If we're running during init we cannot update a monitor directly - they probably + // haven't actually been loaded yet. Instead, push the monitor update as a background + // event. + // Note that while it's safe to use `ClosedMonitorUpdateRegeneratedOnStartup` here (the + // channel is already closed) we need to ultimately handle the monitor update + // completion action only after we've completed the monitor update. This is the only + // way to guarantee this update *will* be regenerated on startup (otherwise if this was + // from a forwarded HTLC the downstream preimage may be deleted before we claim + // upstream). Thus, we need to transition to some new `BackgroundEvent` type which will + // complete the monitor update completion action from `completion_action`. + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(( + prev_hop.outpoint, preimage_update, + ))); } // Note that we do process the completion action here. This totally could be a // duplicate claim, but we have no way of knowing without interrogating the @@ -4732,6 +4938,8 @@ where fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_id: [u8; 32]) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { + debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire), + "We don't support claim_htlc claims during startup - monitors may not be available yet"); self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger); }, HTLCSource::PreviousHopData(hop_data) => { @@ -4887,18 +5095,29 @@ where if peer_state_mutex_opt.is_none() { return } peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - let mut channel = { - match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ - hash_map::Entry::Occupied(chan) => chan, - hash_map::Entry::Vacant(_) => return, - } - }; - log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}", - highest_applied_update_id, channel.get().context.get_latest_monitor_update_id()); - if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id { + let channel = + if let Some(chan) = peer_state.channel_by_id.get_mut(&funding_txo.to_channel_id()) { + chan + } else { + let update_actions = peer_state.monitor_update_blocked_actions + .remove(&funding_txo.to_channel_id()).unwrap_or(Vec::new()); + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(update_actions); + return; + }; + let remaining_in_flight = + if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) { + pending.retain(|upd| upd.update_id > highest_applied_update_id); + pending.len() + } else { 0 }; + log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.", + highest_applied_update_id, channel.context.get_latest_monitor_update_id(), + remaining_in_flight); + if !channel.is_awaiting_monitor_update() || channel.context.get_latest_monitor_update_id() != highest_applied_update_id { return; } - handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, per_peer_state, channel.get_mut()); + handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel); } /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`]. @@ -5109,9 +5328,13 @@ where return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision for the same peer!".to_owned(), msg.temporary_channel_id.clone())) } else { if !self.default_configuration.manually_accept_inbound_channels { - if channel.context.get_channel_type().requires_zero_conf() { + let channel_type = channel.context.get_channel_type(); + if channel_type.requires_zero_conf() { return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone())); } + if channel_type.requires_anchors_zero_fee_htlc_tx() { + return Err(MsgHandleErrInternal::send_err_msg_no_close("No channels with anchor outputs accepted".to_owned(), msg.temporary_channel_id.clone())); + } peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { node_id: counterparty_node_id.clone(), msg: channel.accept_inbound_channel(user_channel_id), @@ -5221,8 +5444,9 @@ where let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); let chan = e.insert(chan); - let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, - per_peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) }); + let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state, + per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, + { peer_state.channel_by_id.remove(&new_channel_id) }); // Note that we reply with the new channel_id in error messages if we gave up on the // channel, not the temporary_channel_id. This is compatible with ourselves, but the @@ -5234,7 +5458,7 @@ where if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { res.0 = None; } - res + res.map(|_| ()) } } } @@ -5255,7 +5479,7 @@ where let monitor = try_chan_entry!(self, chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); let update_res = self.chain_monitor.watch_channel(chan.get().context.get_funding_txo().unwrap(), monitor); - let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan); + let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { // We weren't able to watch the channel to begin with, so no updates should be made on // it. Previously, full_stack_target found an (unreachable) panic when the @@ -5264,7 +5488,7 @@ where shutdown_finish.0.take(); } } - res + res.map(|_| ()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@ -5326,39 +5550,50 @@ where })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(msg.channel_id.clone()) { - hash_map::Entry::Occupied(mut chan_entry) => { - - if !chan_entry.get().received_shutdown() { - log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.", - log_bytes!(msg.channel_id), - if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" }); - } + // TODO(dunxen): Fix this duplication when we switch to a single map with enums as per + // https://github.com/lightningdevkit/rust-lightning/issues/2422 + if let hash_map::Entry::Occupied(chan_entry) = peer_state.outbound_v1_channel_by_id.entry(msg.channel_id.clone()) { + log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", log_bytes!(&msg.channel_id[..])); + self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); + let mut chan = remove_channel!(self, chan_entry); + self.finish_force_close_channel(chan.context.force_shutdown(false)); + return Ok(()); + } else if let hash_map::Entry::Occupied(chan_entry) = peer_state.inbound_v1_channel_by_id.entry(msg.channel_id.clone()) { + log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", log_bytes!(&msg.channel_id[..])); + self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); + let mut chan = remove_channel!(self, chan_entry); + self.finish_force_close_channel(chan.context.force_shutdown(false)); + return Ok(()); + } else if let hash_map::Entry::Occupied(mut chan_entry) = peer_state.channel_by_id.entry(msg.channel_id.clone()) { + if !chan_entry.get().received_shutdown() { + log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.", + log_bytes!(msg.channel_id), + if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" }); + } - let funding_txo_opt = chan_entry.get().context.get_funding_txo(); - let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self, - chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry); - dropped_htlcs = htlcs; + let funding_txo_opt = chan_entry.get().context.get_funding_txo(); + let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self, + chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry); + dropped_htlcs = htlcs; - if let Some(msg) = shutdown { - // We can send the `shutdown` message before updating the `ChannelMonitor` - // here as we don't need the monitor update to complete until we send a - // `shutdown_signed`, which we'll delay if we're pending a monitor update. - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: *counterparty_node_id, - msg, - }); - } + if let Some(msg) = shutdown { + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: *counterparty_node_id, + msg, + }); + } - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update_opt { - let update_id = monitor_update.update_id; - let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update); - break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry); - } - break Ok(()); - }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt { + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); + } + break Ok(()); + } else { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; for htlc_source in dropped_htlcs.drain(..) { @@ -5468,7 +5703,7 @@ where _ => pending_forward_info } }; - try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan); + try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5550,10 +5785,8 @@ where let funding_txo = chan.get().context.get_funding_txo(); let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan) + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, + peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5643,22 +5876,27 @@ where } } - // We only want to push a PendingHTLCsForwardable event if no others are queued. fn push_pending_forwards_ev(&self) { let mut pending_events = self.pending_events.lock().unwrap(); - let forward_ev_exists = pending_events.iter() - .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) - .is_some(); - if !forward_ev_exists { - pending_events.push_back((events::Event::PendingHTLCsForwardable { - time_forwardable: - Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), + let is_processing_events = self.pending_events_processor.load(Ordering::Acquire); + let num_forward_events = pending_events.iter().filter(|(ev, _)| + if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false } + ).count(); + // We only want to push a PendingHTLCsForwardable event if no others are queued. Processing + // events is done in batches and they are not removed until we're done processing each + // batch. Since handling a `PendingHTLCsForwardable` event will call back into the + // `ChannelManager`, we'll still see the original forwarding event not removed. Phantom + // payments will need an additional forwarding event before being claimed to make them look + // real by taking more time. + if (is_processing_events && num_forward_events <= 1) || num_forward_events < 1 { + pending_events.push_back((Event::PendingHTLCsForwardable { + time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), }, None)); } } /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote - /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event + /// [`msgs::RevokeAndACK`] should be held for the given channel until some other action /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of /// the [`ChannelMonitorUpdate`] in question. fn raa_monitor_updates_held(&self, @@ -5687,12 +5925,10 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().context.get_funding_txo(); - let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan); let res = if let Some(monitor_update) = monitor_update_opt { - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, - peer_state_lock, peer_state, per_peer_state, chan) + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) } else { Ok(()) }; (htlcs_to_fail, res) }, @@ -5960,18 +6196,15 @@ where let counterparty_node_id = chan.context.get_counterparty_node_id(); let funding_txo = chan.context.get_funding_txo(); let (monitor_opt, holding_cell_failed_htlcs) = - chan.maybe_free_holding_cell_htlcs(&self.logger); + chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &self.logger); if !holding_cell_failed_htlcs.is_empty() { failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id)); } if let Some(monitor_update) = monitor_opt { has_monitor_update = true; - let update_res = self.chain_monitor.update_channel( - funding_txo.expect("channel is live"), monitor_update); - let update_id = monitor_update.update_id; let channel_id: [u8; 32] = *channel_id; - let res = handle_new_monitor_update!(self, update_res, update_id, + let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING, peer_state.channel_by_id.remove(&channel_id)); if res.is_err() { @@ -6247,7 +6480,7 @@ where inflight_htlcs } - #[cfg(any(test, fuzzing, feature = "_test_utils"))] + #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { let events = core::cell::RefCell::new(Vec::new()); let event_handler = |event: events::Event| events.borrow_mut().push(event); @@ -6280,7 +6513,7 @@ where /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an /// [`Event`] being handled) completes, this should be called to restore the channel to normal /// operation. It will double-check that nothing *else* is also blocking the same channel from - /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly. + /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly. fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option) { let mut errors = Vec::new(); loop { @@ -6313,9 +6546,7 @@ where if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); - let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update); - let update_id = monitor_update.update_id; - if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, + if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, peer_state_lck, peer_state, per_peer_state, chan) { errors.push((e, counterparty_node_id)); @@ -6777,13 +7008,13 @@ where provided_node_features(&self.default_configuration) } - /// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by + /// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by /// [`ChannelManager`]. /// /// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice" /// or not. Thus, this method is not public. #[cfg(any(feature = "_test_utils", test))] - pub fn invoice_features(&self) -> InvoiceFeatures { + pub fn invoice_features(&self) -> Bolt11InvoiceFeatures { provided_invoice_features(&self.default_configuration) } @@ -7036,6 +7267,7 @@ where inbound_v1_channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: true, @@ -7062,37 +7294,20 @@ where log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); let per_peer_state = self.per_peer_state.read().unwrap(); - for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.retain(|_, chan| { - let retain = if chan.context.get_counterparty_node_id() == *counterparty_node_id { - if !chan.context.have_received_message() { - // If we created this (outbound) channel while we were disconnected from the - // peer we probably failed to send the open_channel message, which is now - // lost. We can't have had anything pending related to this channel, so we just - // drop it. - false - } else { - pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { - node_id: chan.context.get_counterparty_node_id(), - msg: chan.get_channel_reestablish(&self.logger), - }); - true - } - } else { true }; - if retain && chan.context.get_counterparty_node_id() != *counterparty_node_id { - if let Some(msg) = chan.get_signed_channel_announcement(&self.node_signer, self.genesis_hash.clone(), self.best_block.read().unwrap().height(), &self.default_configuration) { - if let Ok(update_msg) = self.get_channel_update_for_broadcast(chan) { - pending_msg_events.push(events::MessageSendEvent::SendChannelAnnouncement { - node_id: *counterparty_node_id, - msg, update_msg, - }); - } - } - } - retain + + // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted + // (so won't be recovered after a crash) we don't need to bother closing unfunded channels and + // clearing their maps here. Instead we can just send queue channel_reestablish messages for + // channels in the channel_by_id map. + peer_state.channel_by_id.iter_mut().for_each(|(_, chan)| { + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.context.get_counterparty_node_id(), + msg: chan.get_channel_reestablish(&self.logger), + }); }); } //TODO: Also re-broadcast announcement_signatures @@ -7126,7 +7341,7 @@ where let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; if let Some(chan) = peer_state.outbound_v1_channel_by_id.get_mut(&msg.channel_id) { - if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash) { + if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash, &self.fee_estimator) { peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: *counterparty_node_id, msg, @@ -7214,13 +7429,13 @@ pub(crate) fn provided_node_features(config: &UserConfig) -> NodeFeatures { provided_init_features(config).to_context() } -/// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by +/// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by /// [`ChannelManager`]. /// /// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice" /// or not. Thus, this method is not public. #[cfg(any(feature = "_test_utils", test))] -pub(crate) fn provided_invoice_features(config: &UserConfig) -> InvoiceFeatures { +pub(crate) fn provided_invoice_features(config: &UserConfig) -> Bolt11InvoiceFeatures { provided_init_features(config).to_context() } @@ -7238,7 +7453,7 @@ pub(crate) fn provided_channel_type_features(config: &UserConfig) -> ChannelType /// Fetches the set of [`InitFeatures`] flags which are provided by or required by /// [`ChannelManager`]. -pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { +pub fn provided_init_features(config: &UserConfig) -> InitFeatures { // Note that if new features are added here which other peers may (eventually) require, we // should also add the corresponding (optional) bit to the [`ChannelMessageHandler`] impl for // [`ErroringMessageHandler`]. @@ -7254,11 +7469,8 @@ pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { features.set_channel_type_optional(); features.set_scid_privacy_optional(); features.set_zero_conf_optional(); - #[cfg(anchors)] - { // Attributes are not allowed on if expressions on our current MSRV of 1.41. - if _config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { - features.set_anchors_zero_fee_htlc_tx_optional(); - } + if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { + features.set_anchors_zero_fee_htlc_tx_optional(); } features } @@ -7315,6 +7527,7 @@ impl Writeable for ChannelDetails { (35, self.inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, self.feerate_sat_per_1000_weight, option), + (41, self.channel_shutdown_state, option), }); Ok(()) } @@ -7352,6 +7565,7 @@ impl Readable for ChannelDetails { (35, inbound_htlc_maximum_msat, option), (37, user_channel_id_high_opt, option), (39, feerate_sat_per_1000_weight, option), + (41, channel_shutdown_state, option), }); // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with @@ -7387,12 +7601,13 @@ impl Readable for ChannelDetails { inbound_htlc_minimum_msat, inbound_htlc_maximum_msat, feerate_sat_per_1000_weight, + channel_shutdown_state, }) } } impl_writeable_tlv_based!(PhantomRouteHints, { - (2, channels, vec_type), + (2, channels, required_vec), (4, phantom_scid, required), (6, real_node_pubkey, required), }); @@ -7584,7 +7799,7 @@ impl Readable for HTLCSource { 0 => { let mut session_priv: crate::util::ser::RequiredWrapper = crate::util::ser::RequiredWrapper(None); let mut first_hop_htlc_msat: u64 = 0; - let mut path_hops: Option> = Some(Vec::new()); + let mut path_hops = Vec::new(); let mut payment_id = None; let mut payment_params: Option = None; let mut blinded_tail: Option = None; @@ -7592,7 +7807,7 @@ impl Readable for HTLCSource { (0, session_priv, required), (1, payment_id, option), (2, first_hop_htlc_msat, required), - (4, path_hops, vec_type), + (4, path_hops, required_vec), (5, payment_params, (option: ReadableArgs, 0)), (6, blinded_tail, option), }); @@ -7601,7 +7816,7 @@ impl Readable for HTLCSource { // instead. payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref())); } - let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)?, blinded_tail }; + let path = Path { hops: path_hops, blinded_tail }; if path.hops.len() == 0 { return Err(DecodeError::InvalidValue); } @@ -7636,7 +7851,7 @@ impl Writeable for HTLCSource { (1, payment_id_opt, option), (2, first_hop_htlc_msat, required), // 3 was previously used to write a PaymentSecret for the payment. - (4, path.hops, vec_type), + (4, path.hops, required_vec), (5, None::, option), // payment_params in LDK versions prior to 0.0.115 (6, path.blinded_tail, option), }); @@ -7867,6 +8082,16 @@ where pending_claiming_payments = None; } + let mut in_flight_monitor_updates: Option>> = None; + for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) { + for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() { + if !updates.is_empty() { + if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(HashMap::new()); } + in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates); + } + } + } + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -7876,7 +8101,8 @@ where (6, monitor_update_blocked_actions_per_peer, option), (7, self.fake_scid_rand_bytes, required), (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option), - (9, htlc_purposes, vec_type), + (9, htlc_purposes, required_vec), + (10, in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), }); @@ -7926,6 +8152,14 @@ impl Readable for VecDeque<(Event, Option)> { } } +impl_writeable_tlv_based_enum!(ChannelShutdownState, + (0, NotShuttingDown) => {}, + (2, ShutdownInitiated) => {}, + (4, ResolvingHTLCs) => {}, + (6, NegotiatingClosingFee) => {}, + (8, ShutdownComplete) => {}, ; +); + /// Arguments for the creation of a ChannelManager that are not deserialized. /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation @@ -8093,7 +8327,7 @@ where let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut channel_closures = VecDeque::new(); - let mut pending_background_events = Vec::new(); + let mut close_background_events = Vec::new(); for _ in 0..channel_count { let mut channel: Channel<::Signer> = Channel::read(reader, ( &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config) @@ -8101,17 +8335,7 @@ where let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { - if channel.get_latest_complete_monitor_update_id() > monitor.get_latest_update_id() { - // If the channel is ahead of the monitor, return InvalidValue: - log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); - log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", - log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.get_latest_complete_monitor_update_id()); - log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); - log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); - log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); - log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); - return Err(DecodeError::InvalidValue); - } else if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || + if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() || channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || channel.context.get_latest_monitor_update_id() < monitor.get_latest_update_id() { @@ -8122,7 +8346,7 @@ where log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id()); let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true); if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { - pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update }); } @@ -8155,7 +8379,6 @@ where log_info!(args.logger, "Successfully loaded channel {} at update_id {} against monitor at update id {}", log_bytes!(channel.context.channel_id()), channel.context.get_latest_monitor_update_id(), monitor.get_latest_update_id()); - channel.complete_all_mon_updates_through(monitor.get_latest_update_id()); if let Some(short_channel_id) = channel.context.get_short_channel_id() { short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id())); } @@ -8202,7 +8425,7 @@ where update_id: CLOSED_CHANNEL_UPDATE_ID, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }], }; - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); + close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); } } @@ -8231,20 +8454,27 @@ where claimable_htlcs_list.push((payment_hash, previous_hops)); } - let peer_count: u64 = Readable::read(reader)?; - let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); - for _ in 0..peer_count { - let peer_pubkey = Readable::read(reader)?; - let peer_state = PeerState { - channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), + let peer_state_from_chans = |channel_by_id| { + PeerState { + channel_by_id, outbound_v1_channel_by_id: HashMap::new(), inbound_v1_channel_by_id: HashMap::new(), - latest_features: Readable::read(reader)?, + latest_features: InitFeatures::empty(), pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: false, - }; + } + }; + + let peer_count: u64 = Readable::read(reader)?; + let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); + for _ in 0..peer_count { + let peer_pubkey = Readable::read(reader)?; + let peer_chans = peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()); + let mut peer_state = peer_state_from_chans(peer_chans); + peer_state.latest_features = Readable::read(reader)?; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -8272,24 +8502,6 @@ where } } - for (node_id, peer_mtx) in per_peer_state.iter() { - let peer_state = peer_mtx.lock().unwrap(); - for (_, chan) in peer_state.channel_by_id.iter() { - for update in chan.uncompleted_unblocked_mon_updates() { - if let Some(funding_txo) = chan.context.get_funding_txo() { - log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}", - update.update_id, log_bytes!(funding_txo.to_channel_id())); - pending_background_events.push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id: *node_id, funding_txo, update: update.clone(), - }); - } else { - return Err(DecodeError::InvalidValue); - } - } - } - } - let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111 let highest_seen_timestamp: u32 = Readable::read(reader)?; @@ -8326,6 +8538,7 @@ where let mut pending_claiming_payments = Some(HashMap::new()); let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; + let mut in_flight_monitor_updates: Option>> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -8335,7 +8548,8 @@ where (6, monitor_update_blocked_actions_per_peer, option), (7, fake_scid_rand_bytes, option), (8, events_override, option), - (9, claimable_htlc_purposes, vec_type), + (9, claimable_htlc_purposes, optional_vec), + (10, in_flight_monitor_updates, option), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), }); @@ -8369,6 +8583,118 @@ where retry_lock: Mutex::new(()) }; + // We have to replay (or skip, if they were completed after we wrote the `ChannelManager`) + // each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to + // check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we + // replayed, and for each monitor update we have to replay we have to ensure there's a + // `ChannelMonitor` for it. + // + // In order to do so we first walk all of our live channels (so that we can check their + // state immediately after doing the update replays, when we have the `update_id`s + // available) and then walk any remaining in-flight updates. + // + // Because the actual handling of the in-flight updates is the same, it's macro'ized here: + let mut pending_background_events = Vec::new(); + macro_rules! handle_in_flight_updates { + ($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr, + $monitor: expr, $peer_state: expr, $channel_info_log: expr + ) => { { + let mut max_in_flight_update_id = 0; + $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id()); + for update in $chan_in_flight_upds.iter() { + log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for {}channel {}", + update.update_id, $channel_info_log, log_bytes!($funding_txo.to_channel_id())); + max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id); + pending_background_events.push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: $counterparty_node_id, + funding_txo: $funding_txo, + update: update.clone(), + }); + } + if $chan_in_flight_upds.is_empty() { + // We had some updates to apply, but it turns out they had completed before we + // were serialized, we just weren't notified of that. Thus, we may have to run + // the completion actions for any monitor updates, but otherwise are done. + pending_background_events.push( + BackgroundEvent::MonitorUpdatesComplete { + counterparty_node_id: $counterparty_node_id, + channel_id: $funding_txo.to_channel_id(), + }); + } + if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() { + log_error!(args.logger, "Duplicate in-flight monitor update set for the same channel!"); + return Err(DecodeError::InvalidValue); + } + max_in_flight_update_id + } } + } + + for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() { + let mut peer_state_lock = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (_, chan) in peer_state.channel_by_id.iter() { + // Channels that were persisted have to be funded, otherwise they should have been + // discarded. + let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; + let monitor = args.channel_monitors.get(&funding_txo) + .expect("We already checked for monitor presence when loading channels"); + let mut max_in_flight_update_id = monitor.get_latest_update_id(); + if let Some(in_flight_upds) = &mut in_flight_monitor_updates { + if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) { + max_in_flight_update_id = cmp::max(max_in_flight_update_id, + handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds, + funding_txo, monitor, peer_state, "")); + } + } + if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id { + // If the channel is ahead of the monitor, return InvalidValue: + log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", + log_bytes!(chan.context.channel_id()), monitor.get_latest_update_id(), max_in_flight_update_id); + log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id()); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + if let Some(in_flight_upds) = in_flight_monitor_updates { + for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds { + if let Some(monitor) = args.channel_monitors.get(&funding_txo) { + // Now that we've removed all the in-flight monitor updates for channels that are + // still open, we need to replay any monitor updates that are for closed channels, + // creating the neccessary peer_state entries as we go. + let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| { + Mutex::new(peer_state_from_chans(HashMap::new())) + }); + let mut peer_state = peer_state_mutex.lock().unwrap(); + handle_in_flight_updates!(counterparty_id, chan_in_flight_updates, + funding_txo, monitor, peer_state, "closed "); + } else { + log_error!(args.logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is missing.", + log_bytes!(funding_txo.to_channel_id())); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } + } + + // Note that we have to do the above replays before we push new monitor updates. + pending_background_events.append(&mut close_background_events); + + // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we + // should ensure we try them again on the inbound edge. We put them here and do so after we + // have a fully-constructed `ChannelManager` at the end. + let mut pending_claims_to_replay = Vec::new(); + { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state @@ -8379,7 +8705,8 @@ where // We only rebuild the pending payments map if we were most recently serialized by // 0.0.102+ for (_, monitor) in args.channel_monitors.iter() { - if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() { + let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()); + if counterparty_opt.is_none() { for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() { if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source { if path.hops.is_empty() { @@ -8473,6 +8800,33 @@ where } } } + + // Whether the downstream channel was closed or not, try to re-apply any payment + // preimages from it which may be needed in upstream channels for forwarded + // payments. + let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs() + .into_iter() + .filter_map(|(htlc_source, (htlc, preimage_opt))| { + if let HTLCSource::PreviousHopData(_) = htlc_source { + if let Some(payment_preimage) = preimage_opt { + Some((htlc_source, payment_preimage, htlc.amount_msat, + // Check if `counterparty_opt.is_none()` to see if the + // downstream chan is closed (because we don't have a + // channel_id -> peer map entry). + counterparty_opt.is_none(), + monitor.get_funding_txo().0.to_channel_id())) + } else { None } + } else { + // If it was an outbound payment, we've handled it above - if a preimage + // came in and we persisted the `ChannelManager` we either handled it and + // are good to go or the channel force-closed - we don't have to handle the + // channel still live case here. + None + } + }); + for tuple in outbound_claimed_htlcs_iter { + pending_claims_to_replay.push(tuple); + } } } @@ -8660,6 +9014,12 @@ where blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates .entry(blocked_channel_outpoint.to_channel_id()) .or_insert_with(Vec::new).push(blocking_action.clone()); + } else { + // If the channel we were blocking has closed, we don't need to + // worry about it - the blocked monitor update should never have + // been released from the `Channel` object so it can't have + // completed, and if the channel closed there's no reason to bother + // anymore. } } } @@ -8705,7 +9065,6 @@ where pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), - #[cfg(debug_assertions)] background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), @@ -8724,6 +9083,14 @@ where channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } + for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay { + // We use `downstream_closed` in place of `from_onchain` here just as a guess - we + // don't remember in the `ChannelMonitor` where we got a preimage from, but if the + // channel is closed we just assume that it probably came from an on-chain claim. + channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), + downstream_closed, downstream_chan_id); + } + //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. @@ -8741,7 +9108,7 @@ mod tests { use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, RecipientOnionFields, InterceptId}; use crate::ln::functional_test_utils::*; - use crate::ln::msgs; + use crate::ln::msgs::{self, ErrorAction}; use crate::ln::msgs::ChannelMessageHandler; use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; @@ -9737,7 +10104,50 @@ mod tests { sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat)).is_ok()); } - #[cfg(anchors)] + #[test] + fn test_inbound_anchors_manual_acceptance() { + // Tests that we properly limit inbound channels when we have the manual-channel-acceptance + // flag set and (sometimes) accept channels as 0conf. + let mut anchors_cfg = test_default_channel_config(); + anchors_cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; + + let mut anchors_manual_accept_cfg = anchors_cfg.clone(); + anchors_manual_accept_cfg.manually_accept_inbound_channels = true; + + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, + &[Some(anchors_cfg.clone()), Some(anchors_cfg.clone()), Some(anchors_manual_accept_cfg.clone())]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap(); + let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); + + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + match &msg_events[0] { + MessageSendEvent::HandleError { node_id, action } => { + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + match action { + ErrorAction::SendErrorMessage { msg } => + assert_eq!(msg.data, "No channels with anchor outputs accepted".to_owned()), + _ => panic!("Unexpected error action"), + } + } + _ => panic!("Unexpected event"), + } + + nodes[2].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + let events = nodes[2].node.get_and_clear_pending_events(); + match events[0] { + Event::OpenChannelRequest { temporary_channel_id, .. } => + nodes[2].node.accept_inbound_channel(&temporary_channel_id, &nodes[0].node.get_our_node_id(), 23).unwrap(), + _ => panic!("Unexpected event"), + } + get_event_msg!(nodes[2], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); + } + #[test] fn test_anchors_zero_fee_htlc_tx_fallback() { // Tests that if both nodes support anchors, but the remote node does not want to accept @@ -9827,6 +10237,25 @@ mod tests { MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("expected BroadcastChannelUpdate event"), } + + // If we provide a channel_id not associated with the peer, we should get an error and no updates + // should be applied to ensure update atomicity as specified in the API docs. + let bad_channel_id = [10; 32]; + let current_fee = nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_proportional_millionths; + let new_fee = current_fee + 100; + assert!( + matches!( + nodes[0].node.update_partial_channel_config(&channel.counterparty.node_id, &[channel.channel_id, bad_channel_id], &ChannelConfigUpdate { + forwarding_fee_proportional_millionths: Some(new_fee), + ..Default::default() + }), + Err(APIError::ChannelUnavailable { err: _ }), + ) + ); + // Check that the fee hasn't changed for the channel that exists. + assert_eq!(nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_proportional_millionths, current_fee); + let events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 0); } } @@ -9842,7 +10271,7 @@ pub mod bench { use crate::routing::gossip::NetworkGraph; use crate::routing::router::{PaymentParameters, RouteParameters}; use crate::util::test_utils; - use crate::util::config::UserConfig; + use crate::util::config::{UserConfig, MaxDustHTLCExposure}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; @@ -9889,6 +10318,7 @@ pub mod bench { let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer); let mut config: UserConfig = Default::default(); + config.channel_config.max_dust_htlc_exposure = MaxDustHTLCExposure::FeeRateMultiplier(5_000_000 / 253); config.channel_handshake_config.minimum_depth = 1; let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a);