X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=45f45e138629d7dc6ddf2ed27b04d45e55e174ed;hb=d5fb804a329e36ac41e1b85e3fedce8d5dc40e6b;hp=59d8e387529412a5d0dcd8adc61050791d8643fe;hpb=f78448c604e3859d6c387d253549474e667e9527;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 59d8e387..45f45e13 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -70,7 +70,7 @@ use crate::prelude::*; use core::{cmp, mem}; use core::cell::RefCell; use crate::io::Read; -use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock}; +use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState}; use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; use core::ops::Deref; @@ -487,6 +487,22 @@ pub(super) struct PeerState { /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// The peer is currently connected (i.e. we've seen a + /// [`ChannelMessageHandler::peer_connected`] and no corresponding + /// [`ChannelMessageHandler::peer_disconnected`]. + is_connected: bool, +} + +impl PeerState { + /// Indicates that a peer meets the criteria where we're ok to remove it from our storage. + /// If true is passed for `require_disconnected`, the function will return false if we haven't + /// disconnected from the node already, ie. `PeerState::is_connected` is set to `true`. + fn ok_to_remove(&self, require_disconnected: bool) -> bool { + if require_disconnected && self.is_connected { + return false + } + self.channel_by_id.len() == 0 + } } /// Stores a PaymentSecret and any other data we may need to validate an inbound payment is @@ -577,6 +593,15 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C /// offline for a full minute. In order to track this, you must call /// timer_tick_occurred roughly once per minute, though it doesn't have to be perfect. /// +/// To avoid trivial DoS issues, ChannelManager limits the number of inbound connections and +/// inbound channels without confirmed funding transactions. This may result in nodes which we do +/// not have a channel with being unable to connect to us or open new channels with us if we have +/// many peers with unfunded channels. +/// +/// Because it is an indication of trust, inbound channels which we've accepted as 0conf are +/// exempted from the count of unfunded channels. Similarly, outbound channels and connections are +/// never limited. Please ensure you limit the count of such channels yourself. +/// /// Rather than using a plain ChannelManager, it is preferable to use either a SimpleArcChannelManager /// a SimpleRefChannelManager, for conciseness. See their documentation for more details, but /// essentially you should default to using a SimpleRefChannelManager, and use a @@ -763,9 +788,8 @@ where /// very far in the past, and can only ever be up to two hours in the future. highest_seen_timestamp: AtomicUsize, - /// The bulk of our storage will eventually be here (message queues and the like). Currently - /// the `per_peer_state` stores our channels on a per-peer basis, as well as the peer's latest - /// features. + /// The bulk of our storage. Currently the `per_peer_state` stores our channels on a per-peer + /// basis, as well as the peer's latest features. /// /// If we are connected to a peer we always at least have an entry here, even if no channels /// are currently open with that peer. @@ -928,6 +952,19 @@ pub(crate) const MPP_TIMEOUT_TICKS: u8 = 3; /// [`OutboundPayments::remove_stale_resolved_payments`]. pub(crate) const IDEMPOTENCY_TIMEOUT_TICKS: u8 = 7; +/// The maximum number of unfunded channels we can have per-peer before we start rejecting new +/// (inbound) ones. The number of peers with unfunded channels is limited separately in +/// [`MAX_UNFUNDED_CHANNEL_PEERS`]. +const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4; + +/// The maximum number of peers from which we will allow pending unfunded channels. Once we reach +/// this many peers we reject new (inbound) channels from peers with which we don't have a channel. +const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50; + +/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this +/// many peers we reject new (inbound) connections. +const MAX_NO_CHANNEL_PEERS: usize = 250; + /// Information needed for constructing an invoice route hint for this channel. #[derive(Clone, Debug, PartialEq)] pub struct CounterpartyForwardingInfo { @@ -1175,9 +1212,9 @@ pub enum RecentPaymentDetails { /// made before LDK version 0.0.104. payment_hash: Option, }, - /// After a payment is explicitly abandoned by calling [`ChannelManager::abandon_payment`], it - /// is marked as abandoned until an [`Event::PaymentFailed`] is generated. A payment could also - /// be marked as abandoned if pathfinding fails repeatedly or retries have been exhausted. + /// After a payment's retries are exhausted per the provided [`Retry`], or it is explicitly + /// abandoned via [`ChannelManager::abandon_payment`], it is marked as abandoned until all + /// pending HTLCs for this payment resolve and an [`Event::PaymentFailed`] is generated. Abandoned { /// Hash of the payment that we have given up trying to send. payment_hash: PaymentHash, @@ -1203,13 +1240,10 @@ macro_rules! handle_error { match $internal { Ok(msg) => Ok(msg), Err(MsgHandleErrInternal { err, chan_id, shutdown_finish }) => { - #[cfg(any(feature = "_test_utils", test))] - { - // In testing, ensure there are no deadlocks where the lock is already held upon - // entering the macro. - debug_assert!($self.pending_events.try_lock().is_ok()); - debug_assert!($self.per_peer_state.try_write().is_ok()); - } + // In testing, ensure there are no deadlocks where the lock is already held upon + // entering the macro. + debug_assert_ne!($self.pending_events.held_by_thread(), LockHeldState::HeldByThread); + debug_assert_ne!($self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread); let mut msg_events = Vec::with_capacity(2); @@ -1243,26 +1277,6 @@ macro_rules! handle_error { let mut peer_state = peer_state_mutex.lock().unwrap(); peer_state.pending_msg_events.append(&mut msg_events); } - #[cfg(any(feature = "_test_utils", test))] - { - if let None = per_peer_state.get(&$counterparty_node_id) { - // This shouldn't occour in tests unless an unkown counterparty_node_id - // has been passed to our message handling functions. - let expected_error_str = format!("Can't find a peer matching the passed counterparty node_id {}", $counterparty_node_id); - match err.action { - msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { ref channel_id, ref data } - } - => { - assert_eq!(*data, expected_error_str); - if let Some((err_channel_id, _user_channel_id)) = chan_id { - debug_assert_eq!(*channel_id, err_channel_id); - } - } - _ => debug_assert!(false, "Unexpected event"), - } - } - } } // Return error in case higher-API need one @@ -1592,12 +1606,10 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&their_network_key); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError { err: format!("Not connected to node: {}", their_network_key) }); - } + let peer_state_mutex = per_peer_state.get(&their_network_key) + .ok_or_else(|| APIError::APIMisuseError{ err: format!("Not connected to node: {}", their_network_key) })?; - let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state = peer_state_mutex.lock().unwrap(); let channel = { let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); let their_features = &peer_state.latest_features; @@ -1635,14 +1647,13 @@ where } fn list_channels_with_filter::Signer>)) -> bool + Copy>(&self, f: Fn) -> Vec { - let mut res = Vec::new(); // Allocate our best estimate of the number of channels we have in the `res` // Vec. Sadly the `short_to_chan_info` map doesn't cover channels without // a scid or a scid alias, and the `id_to_peer` shouldn't be used outside // of the ChannelMonitor handling. Therefore reallocations may still occur, but is // unlikely as the `short_to_chan_info` map often contains 2 entries for // the same channel. - res.reserve(self.short_to_chan_info.read().unwrap().len()); + let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len()); { let best_block_height = self.best_block.read().unwrap().height(); let per_peer_state = self.per_peer_state.read().unwrap(); @@ -1726,7 +1737,7 @@ where /// /// This can be useful for payments that may have been prepared, but ultimately not sent, as a /// result of a crash. If such a payment exists, is not listed here, and an - /// [`Event::PaymentSent`] has not been received, you may consider retrying the payment. + /// [`Event::PaymentSent`] has not been received, you may consider resending the payment. /// /// [`Event::PaymentSent`]: events::Event::PaymentSent pub fn list_recent_payments(&self) -> Vec { @@ -1772,12 +1783,10 @@ where let result: Result<(), _> = loop { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -1893,12 +1902,10 @@ where fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: &PublicKey, peer_msg: Option<&String>, broadcast: bool) -> Result { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(peer_node_id); + let peer_state_mutex = per_peer_state.get(peer_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?; let mut chan = { - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(chan) = peer_state.channel_by_id.entry(channel_id.clone()) { if let Some(peer_msg) = peer_msg { @@ -1914,7 +1921,7 @@ where log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); self.finish_force_close_channel(chan.force_shutdown(broadcast)); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state = peer_state_mutex.lock().unwrap(); peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); @@ -2201,7 +2208,7 @@ where let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { + if peer_state_mutex_opt.is_none() { break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); @@ -2315,7 +2322,9 @@ where /// public, and thus should be called whenever the result is going to be passed out in a /// [`MessageSendEvent::BroadcastChannelUpdate`] event. /// - /// May be called with peer_state already locked! + /// Note that in `internal_closing_signed`, this function is called without the `peer_state` + /// corresponding to the channel's counterparty locked, as the channel been removed from the + /// storage and the `peer_state` lock has been dropped. fn get_channel_update_for_broadcast(&self, chan: &Channel<::Signer>) -> Result { if !chan.should_announce() { return Err(LightningError { @@ -2334,7 +2343,10 @@ where /// is public (only returning an Err if the channel does not yet have an assigned short_id), /// and thus MUST NOT be called unless the recipient of the resulting message has already /// provided evidence that they know about the existence of the channel. - /// May be called with peer_state already locked! + /// + /// Note that through `internal_closing_signed`, this function is called without the + /// `peer_state` corresponding to the channel's counterparty locked, as the channel been + /// removed from the storage and the `peer_state` lock has been dropped. fn get_channel_update_for_unicast(&self, chan: &Channel<::Signer>) -> Result { log_trace!(self.logger, "Attempting to generate channel update for channel {}", log_bytes!(chan.channel_id())); let short_channel_id = match chan.get_short_channel_id().or(chan.latest_inbound_scid_alias()) { @@ -2395,11 +2407,9 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::InvalidRoute{err: "No peer matching the path's first hop found!" }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(&counterparty_node_id) + .ok_or_else(|| APIError::InvalidRoute{err: "No peer matching the path's first hop found!" })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) { match { @@ -2484,8 +2494,8 @@ where /// If a pending payment is currently in-flight with the same [`PaymentId`] provided, this /// method will error with an [`APIError::InvalidRoute`]. Note, however, that once a payment /// is no longer pending (either via [`ChannelManager::abandon_payment`], or handling of an - /// [`Event::PaymentSent`]) LDK will not stop you from sending a second payment with the same - /// [`PaymentId`]. + /// [`Event::PaymentSent`] or [`Event::PaymentFailed`]) LDK will not stop you from sending a + /// second payment with the same [`PaymentId`]. /// /// Thus, in order to ensure duplicate payments are not sent, you should implement your own /// tracking of payments, including state to indicate once a payment has completed. Because you @@ -2530,6 +2540,7 @@ where /// [`Route`], we assume the invoice had the basic_mpp feature set. /// /// [`Event::PaymentSent`]: events::Event::PaymentSent + /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress pub fn send_payment(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option, payment_id: PaymentId) -> Result<(), PaymentSendFailure> { @@ -2546,7 +2557,7 @@ where let best_block_height = self.best_block.read().unwrap().height(); self.pending_outbound_payments .send_payment(payment_hash, payment_secret, payment_id, retry_strategy, route_params, - &self.router, self.list_usable_channels(), self.compute_inflight_htlcs(), + &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.logger, |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) @@ -2567,48 +2578,25 @@ where } - /// Retries a payment along the given [`Route`]. + /// Signals that no further retries for the given payment should occur. Useful if you have a + /// pending outbound payment with retries remaining, but wish to stop retrying the payment before + /// retries are exhausted. /// - /// Errors returned are a superset of those returned from [`send_payment`], so see - /// [`send_payment`] documentation for more details on errors. This method will also error if the - /// retry amount puts the payment more than 10% over the payment's total amount, if the payment - /// for the given `payment_id` cannot be found (likely due to timeout or success), or if - /// further retries have been disabled with [`abandon_payment`]. - /// - /// [`send_payment`]: [`ChannelManager::send_payment`] - /// [`abandon_payment`]: [`ChannelManager::abandon_payment`] - pub fn retry_payment(&self, route: &Route, payment_id: PaymentId) -> Result<(), PaymentSendFailure> { - let best_block_height = self.best_block.read().unwrap().height(); - self.pending_outbound_payments.retry_payment_with_route(route, payment_id, &self.entropy_source, &self.node_signer, best_block_height, - |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) - } - - /// Signals that no further retries for the given payment will occur. - /// - /// After this method returns, no future calls to [`retry_payment`] for the given `payment_id` - /// are allowed. If no [`Event::PaymentFailed`] event had been generated before, one will be - /// generated as soon as there are no remaining pending HTLCs for this payment. + /// If no [`Event::PaymentFailed`] event had been generated before, one will be generated as soon + /// as there are no remaining pending HTLCs for this payment. /// /// Note that calling this method does *not* prevent a payment from succeeding. You must still /// wait until you receive either a [`Event::PaymentFailed`] or [`Event::PaymentSent`] event to /// determine the ultimate status of a payment. /// /// If an [`Event::PaymentFailed`] event is generated and we restart without this - /// [`ChannelManager`] having been persisted, the payment may still be in the pending state - /// upon restart. This allows further calls to [`retry_payment`] (and requiring a second call - /// to [`abandon_payment`] to mark the payment as failed again). Otherwise, future calls to - /// [`retry_payment`] will fail with [`PaymentSendFailure::ParameterError`]. + /// [`ChannelManager`] having been persisted, another [`Event::PaymentFailed`] may be generated. /// - /// [`abandon_payment`]: Self::abandon_payment - /// [`retry_payment`]: Self::retry_payment /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`Event::PaymentSent`]: events::Event::PaymentSent pub fn abandon_payment(&self, payment_id: PaymentId) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - if let Some(payment_failed_ev) = self.pending_outbound_payments.abandon_payment(payment_id) { - self.pending_events.lock().unwrap().push(payment_failed_ev); - } + self.pending_outbound_payments.abandon_payment(payment_id, &self.pending_events); } /// Send a spontaneous payment, which is a payment that does not require the recipient to have @@ -2646,7 +2634,7 @@ where let best_block_height = self.best_block.read().unwrap().height(); self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), - self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, + || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.logger, |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) @@ -2675,12 +2663,10 @@ where &self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput ) -> Result<(), APIError> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let (chan, msg) = { let (res, chan) = { @@ -2702,7 +2688,7 @@ where (chan, funding_msg) }, Err(_) => { return Err(APIError::ChannelUnavailable { - err: "Error deriving keys or signing initial commitment transactions - either our RNG or our counterparty's RNG is broken or the Signer refused to sign".to_owned() + err: "Signer refused to sign the initial commitment transaction".to_owned() }) }, } }; @@ -2846,11 +2832,9 @@ where &self.total_consistency_lock, &self.persistence_notifier, ); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; for channel_id in channel_ids { if !peer_state.channel_by_id.contains_key(channel_id) { @@ -2903,24 +2887,22 @@ where let next_hop_scid = { let peer_state_lock = self.per_peer_state.read().unwrap(); - if let Some(peer_state_mutex) = peer_state_lock.get(&next_node_id) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.get(next_hop_channel_id) { - Some(chan) => { - if !chan.is_usable() { - return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id)) - }) - } - chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias()) - }, - None => return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id) - }) - } - } else { - return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) }); + let peer_state_mutex = peer_state_lock.get(&next_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.get(next_hop_channel_id) { + Some(chan) => { + if !chan.is_usable() { + return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id)) + }) + } + chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias()) + }, + None => return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id) + }) } }; @@ -3100,7 +3082,7 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { + if peer_state_mutex_opt.is_none() { forwarding_channel_not_found!(); continue; } @@ -3387,7 +3369,8 @@ where let best_block_height = self.best_block.read().unwrap().height(); self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(), - || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.logger, + || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, + &self.pending_events, &self.logger, |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)); @@ -3492,6 +3475,7 @@ where /// the channel. /// * Expiring a channel's previous `ChannelConfig` if necessary to only allow forwarding HTLCs /// with the current `ChannelConfig`. + /// * Removing peers which have disconnected but and no longer have any channels. /// /// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate /// estimate fetches. @@ -3504,19 +3488,21 @@ where let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new(); let mut timed_out_mpp_htlcs = Vec::new(); + let mut pending_peers_awaiting_removal = Vec::new(); { let per_peer_state = self.per_peer_state.read().unwrap(); for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; + let counterparty_node_id = *counterparty_node_id; peer_state.channel_by_id.retain(|chan_id, chan| { let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } if let Err(e) = chan.timer_check_closing_negotiation_progress() { let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id); - handle_errors.push((Err(err), *counterparty_node_id)); + handle_errors.push((Err(err), counterparty_node_id)); if needs_close { return false; } } @@ -3550,6 +3536,36 @@ where true }); + if peer_state.ok_to_remove(true) { + pending_peers_awaiting_removal.push(counterparty_node_id); + } + } + } + + // When a peer disconnects but still has channels, the peer's `peer_state` entry in the + // `per_peer_state` is not removed by the `peer_disconnected` function. If the channels + // of to that peer is later closed while still being disconnected (i.e. force closed), + // we therefore need to remove the peer from `peer_state` separately. + // To avoid having to take the `per_peer_state` `write` lock once the channels are + // closed, we instead remove such peers awaiting removal here on a timer, to limit the + // negative effects on parallelism as much as possible. + if pending_peers_awaiting_removal.len() > 0 { + let mut per_peer_state = self.per_peer_state.write().unwrap(); + for counterparty_node_id in pending_peers_awaiting_removal { + match per_peer_state.entry(counterparty_node_id) { + hash_map::Entry::Occupied(entry) => { + // Remove the entry if the peer is still disconnected and we still + // have no channels to the peer. + let remove_entry = { + let peer_state = entry.get().lock().unwrap(); + peer_state.ok_to_remove(true) + }; + if remove_entry { + entry.remove_entry(); + } + }, + hash_map::Entry::Vacant(_) => { /* The PeerState has already been removed */ } + } } } @@ -3725,17 +3741,12 @@ where /// Fails an HTLC backwards to the sender of it to us. /// Note that we do not assume that channels corresponding to failed HTLCs are still available. fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) { - #[cfg(any(feature = "_test_utils", test))] - { - // Ensure that no peer state channel storage lock is not held when calling this - // function. - // This ensures that future code doesn't introduce a lock_order requirement for - // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling - // this function with any `per_peer_state` peer lock aquired would. - let per_peer_state = self.per_peer_state.read().unwrap(); - for (_, peer) in per_peer_state.iter() { - debug_assert!(peer.try_lock().is_ok()); - } + // Ensure that no peer state channel storage lock is held when calling this function. + // This ensures that future code doesn't introduce a lock-order requirement for + // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling + // this function with any `per_peer_state` peer lock acquired would. + for (_, peer) in self.per_peer_state.read().unwrap().iter() { + debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread); } //TODO: There is a timing attack here where if a node fails an HTLC back to us they can @@ -3748,16 +3759,19 @@ where // being fully configured. See the docs for `ChannelManagerReadArgs` for more. match source { HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, ref payment_params, .. } => { - self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx, &self.pending_events, &self.logger); + if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, + session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx, + &self.pending_events, &self.logger) + { self.push_pending_forwards_ev(); } }, HTLCSource::PreviousHopData(HTLCPreviousHopData { ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret, ref phantom_shared_secret, ref outpoint }) => { log_trace!(self.logger, "Failing HTLC with payment_hash {} backwards from us with {:?}", log_bytes!(payment_hash.0), onion_error); let err_packet = onion_error.get_encrypted_failure_packet(incoming_packet_shared_secret, phantom_shared_secret); - let mut forward_event = None; + let mut push_forward_ev = false; let mut forward_htlcs = self.forward_htlcs.lock().unwrap(); if forward_htlcs.is_empty() { - forward_event = Some(Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS)); + push_forward_ev = true; } match forward_htlcs.entry(*short_channel_id) { hash_map::Entry::Occupied(mut entry) => { @@ -3768,12 +3782,8 @@ where } } mem::drop(forward_htlcs); + if push_forward_ev { self.push_pending_forwards_ev(); } let mut pending_events = self.pending_events.lock().unwrap(); - if let Some(time) = forward_event { - pending_events.push(events::Event::PendingHTLCsForwardable { - time_forwardable: time - }); - } pending_events.push(events::Event::HTLCHandlingFailed { prev_channel_id: outpoint.to_channel_id(), failed_next_destination: destination, @@ -3850,7 +3860,7 @@ where let mut expected_amt_msat = None; let mut valid_mpp = true; let mut errs = Vec::new(); - let mut per_peer_state = Some(self.per_peer_state.read().unwrap()); + let per_peer_state = self.per_peer_state.read().unwrap(); for htlc in sources.iter() { let (counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) { Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), @@ -3860,16 +3870,16 @@ where } }; - if let None = per_peer_state.as_ref().unwrap().get(&counterparty_node_id) { + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { valid_mpp = false; break; } - let peer_state_mutex = per_peer_state.as_ref().unwrap().get(&counterparty_node_id).unwrap(); - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let None = peer_state.channel_by_id.get(&chan_id) { + if peer_state.channel_by_id.get(&chan_id).is_none() { valid_mpp = false; break; } @@ -3895,14 +3905,13 @@ where claimable_amt_msat += htlc.value; } + mem::drop(per_peer_state); if sources.is_empty() || expected_amt_msat.is_none() { - mem::drop(per_peer_state); self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); log_info!(self.logger, "Attempted to claim an incomplete payment which no longer had any available HTLCs!"); return; } if claimable_amt_msat != expected_amt_msat.unwrap() { - mem::drop(per_peer_state); self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); log_info!(self.logger, "Attempted to claim an incomplete payment, expected {} msat, had {} available to claim.", expected_amt_msat.unwrap(), claimable_amt_msat); @@ -3910,8 +3919,7 @@ where } if valid_mpp { for htlc in sources.drain(..) { - if per_peer_state.is_none() { per_peer_state = Some(self.per_peer_state.read().unwrap()); } - if let Err((pk, err)) = self.claim_funds_from_hop(per_peer_state.take().unwrap(), + if let Err((pk, err)) = self.claim_funds_from_hop( htlc.prev_hop, payment_preimage, |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })) { @@ -3923,7 +3931,6 @@ where } } } - mem::drop(per_peer_state); if !valid_mpp { for htlc in sources.drain(..) { let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec(); @@ -3944,11 +3951,11 @@ where } fn claim_funds_from_hop) -> Option>(&self, - per_peer_state_lock: RwLockReadGuard::Signer>>>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc) -> Result<(), (PublicKey, MsgHandleErrInternal)> { //TODO: Delay the claimed_funds relaying just like we do outbound relay! + let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.outpoint.to_channel_id(); let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) { @@ -3956,83 +3963,76 @@ where None => None }; - let (found_channel, mut peer_state_opt) = if counterparty_node_id_opt.is_some() && per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).is_some() { - let peer_mutex = per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).unwrap(); - let peer_state = peer_mutex.lock().unwrap(); - let found_channel = peer_state.channel_by_id.contains_key(&chan_id); - (found_channel, Some(peer_state)) - } else { (false, None) }; - - if found_channel { - let peer_state = &mut *peer_state_opt.as_mut().unwrap(); - if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) { - let counterparty_node_id = chan.get().get_counterparty_node_id(); - match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { - Ok(msgs_monitor_option) => { - if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { - match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, - "Failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, e); - let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); - mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - return Err((counterparty_node_id, err)); - } - } - if let Some((msg, commitment_signed)) = msgs { - log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", - log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id, - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: vec![msg], - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - } - }); - } - mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - Ok(()) - } else { - Ok(()) - } - }, - Err((e, monitor_update)) => { + let mut peer_state_opt = counterparty_node_id_opt.as_ref().map( + |counterparty_node_id| per_peer_state.get(counterparty_node_id).map( + |peer_mutex| peer_mutex.lock().unwrap() + ) + ).unwrap_or(None); + + if let Some(hash_map::Entry::Occupied(mut chan)) = peer_state_opt.as_mut().map(|peer_state| peer_state.channel_by_id.entry(chan_id)) + { + let counterparty_node_id = chan.get().get_counterparty_node_id(); + match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { + Ok(msgs_monitor_option) => { + if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { ChannelMonitorUpdateStatus::Completed => {}, e => { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same update and try - // again on restart. - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, - "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", + log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, + "Failed to update channel monitor with preimage {:?}: {:?}", payment_preimage, e); - }, + let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); + mem::drop(peer_state_opt); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); + return Err((counterparty_node_id, err)); + } } - let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); - if drop { - chan.remove_entry(); + if let Some((msg, commitment_signed)) = msgs { + log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", + log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); + peer_state_opt.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: counterparty_node_id, + updates: msgs::CommitmentUpdate { + update_add_htlcs: Vec::new(), + update_fulfill_htlcs: vec![msg], + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + } + }); } mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); - self.handle_monitor_update_completion_actions(completion_action(None)); - Err((counterparty_node_id, res)) - }, - } - } else { - // We've held the peer_state mutex since finding the channel and setting - // found_channel to true, so the channel can't have been dropped. - unreachable!() + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); + Ok(()) + } else { + Ok(()) + } + }, + Err((e, monitor_update)) => { + match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { + ChannelMonitorUpdateStatus::Completed => {}, + e => { + // TODO: This needs to be handled somehow - if we receive a monitor update + // with a preimage we *must* somehow manage to propagate it to the upstream + // channel, or we must have an ability to receive the same update and try + // again on restart. + log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, + "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", + payment_preimage, e); + }, + } + let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); + if drop { + chan.remove_entry(); + } + mem::drop(peer_state_opt); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(completion_action(None)); + Err((counterparty_node_id, res)) + }, } } else { let preimage_update = ChannelMonitorUpdate { @@ -4053,7 +4053,7 @@ where payment_preimage, update_res); } mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); + mem::drop(per_peer_state); // Note that we do process the completion action here. This totally could be a // duplicate claim, but we have no way of knowing without interrogating the // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are @@ -4075,7 +4075,7 @@ where }, HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; - let res = self.claim_funds_from_hop(self.per_peer_state.read().unwrap(), hop_data, payment_preimage, + let res = self.claim_funds_from_hop(hop_data, payment_preimage, |htlc_claim_value_msat| { if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat { @@ -4207,7 +4207,7 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); let mut peer_state_lock; let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { return } + if peer_state_mutex_opt.is_none() { return } peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; let mut channel = { @@ -4296,13 +4296,13 @@ where fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u128) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let peers_without_funded_channels = self.peers_without_funded_channels(|peer| !peer.channel_by_id.is_empty()); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let is_only_peer_channel = peer_state.channel_by_id.len() == 1; match peer_state.channel_by_id.entry(temporary_channel_id.clone()) { hash_map::Entry::Occupied(mut channel) => { if !channel.get().inbound_is_awaiting_accept() { @@ -4320,6 +4320,21 @@ where peer_state.pending_msg_events.push(send_msg_err_event); let _ = remove_channel!(self, channel); return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() }); + } else { + // If this peer already has some channels, a new channel won't increase our number of peers + // with unfunded channels, so as long as we aren't over the maximum number of unfunded + // channels per-peer we can accept channels from a peer with existing ones. + if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS { + let send_msg_err_event = events::MessageSendEvent::HandleError { + node_id: channel.get().get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage{ + msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), } + } + }; + peer_state.pending_msg_events.push(send_msg_err_event); + let _ = remove_channel!(self, channel); + return Err(APIError::APIMisuseError { err: "Too many peers with unfunded channels, refusing to accept new ones".to_owned() }); + } } peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { @@ -4334,6 +4349,43 @@ where Ok(()) } + /// Gets the number of peers which match the given filter and do not have any funded, outbound, + /// or 0-conf channels. + /// + /// The filter is called for each peer and provided with the number of unfunded, inbound, and + /// non-0-conf channels we have with the peer. + fn peers_without_funded_channels(&self, maybe_count_peer: Filter) -> usize + where Filter: Fn(&PeerState<::Signer>) -> bool { + let mut peers_without_funded_channels = 0; + let best_block_height = self.best_block.read().unwrap().height(); + { + let peer_state_lock = self.per_peer_state.read().unwrap(); + for (_, peer_mtx) in peer_state_lock.iter() { + let peer = peer_mtx.lock().unwrap(); + if !maybe_count_peer(&*peer) { continue; } + let num_unfunded_channels = Self::unfunded_channel_count(&peer, best_block_height); + if num_unfunded_channels == peer.channel_by_id.len() { + peers_without_funded_channels += 1; + } + } + } + return peers_without_funded_channels; + } + + fn unfunded_channel_count( + peer: &PeerState<::Signer>, best_block_height: u32 + ) -> usize { + let mut num_unfunded_channels = 0; + for (_, chan) in peer.channel_by_id.iter() { + if !chan.is_outbound() && chan.minimum_depth().unwrap_or(1) != 0 && + chan.get_funding_tx_confirmations(best_block_height) == 0 + { + num_unfunded_channels += 1; + } + } + num_unfunded_channels + } + fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { if msg.chain_hash != self.genesis_hash { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone())); @@ -4346,18 +4398,44 @@ where let mut random_bytes = [0u8; 16]; random_bytes.copy_from_slice(&self.entropy_source.get_secure_random_bytes()[..16]); let user_channel_id = u128::from_be_bytes(random_bytes); - let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); + + // Get the number of peers with channels, but without funded ones. We don't care too much + // about peers that never open a channel, so we filter by peers that have at least one + // channel, and then limit the number of those with unfunded channels. + let channeled_peers_without_funding = self.peers_without_funded_channels(|node| !node.channel_by_id.is_empty()); + let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id.clone())) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id.clone()) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + + // If this peer already has some channels, a new channel won't increase our number of peers + // with unfunded channels, so as long as we aren't over the maximum number of unfunded + // channels per-peer we can accept channels from a peer with existing ones. + if peer_state.channel_by_id.is_empty() && + channeled_peers_without_funding >= MAX_UNFUNDED_CHANNEL_PEERS && + !self.default_configuration.manually_accept_inbound_channels + { + return Err(MsgHandleErrInternal::send_err_msg_no_close( + "Have too many peers with unfunded channels, not accepting new ones".to_owned(), + msg.temporary_channel_id.clone())); + } + + let best_block_height = self.best_block.read().unwrap().height(); + if Self::unfunded_channel_count(peer_state, best_block_height) >= MAX_UNFUNDED_CHANS_PER_PEER { + return Err(MsgHandleErrInternal::send_err_msg_no_close( + format!("Refusing more than {} unfunded channels.", MAX_UNFUNDED_CHANS_PER_PEER), + msg.temporary_channel_id.clone())); + } + let mut channel = match Channel::new_from_req(&self.fee_estimator, &self.entropy_source, &self.signer_provider, - counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration, - self.best_block.read().unwrap().height(), &self.logger, outbound_scid_alias) + counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, + &self.default_configuration, best_block_height, &self.logger, outbound_scid_alias) { Err(e) => { self.outbound_scid_aliases.lock().unwrap().remove(&outbound_scid_alias); @@ -4401,11 +4479,12 @@ where fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { let (value, output_script, user_id) = { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4428,13 +4507,14 @@ where fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id) + })?; let ((funding_msg, monitor, mut channel_ready), mut chan) = { let best_block = *self.best_block.read().unwrap(); - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4471,7 +4551,7 @@ where // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the // peer exists, despite the inner PeerState potentially having no channels after removing // the channel above. - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_msg.channel_id) { hash_map::Entry::Occupied(_) => { @@ -4506,12 +4586,13 @@ where let funding_tx = { let best_block = *self.best_block.read().unwrap(); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4549,11 +4630,12 @@ where fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4592,11 +4674,12 @@ where let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>; let result: Result<(), _> = loop { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -4645,12 +4728,13 @@ where fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; let (tx, chan_option) = { - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -4679,7 +4763,7 @@ where } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update @@ -4702,11 +4786,12 @@ where let pending_forward_info = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4743,11 +4828,12 @@ where fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> { let (htlc_source, forwarded_htlc_value) = { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4762,11 +4848,12 @@ where fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4779,11 +4866,12 @@ where fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4800,11 +4888,12 @@ where fn internal_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4850,7 +4939,7 @@ where #[inline] fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) { for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards { - let mut forward_event = None; + let mut push_forward_event = false; let mut new_intercept_events = Vec::new(); let mut failed_intercept_forwards = Vec::new(); if !pending_forwards.is_empty() { @@ -4908,7 +4997,7 @@ where // We don't want to generate a PendingHTLCsForwardable event if only intercepted // payments are being processed. if forward_htlcs_empty { - forward_event = Some(Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS)); + push_forward_event = true; } entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info }))); @@ -4926,16 +5015,21 @@ where let mut events = self.pending_events.lock().unwrap(); events.append(&mut new_intercept_events); } + if push_forward_event { self.push_pending_forwards_ev() } + } + } - match forward_event { - Some(time) => { - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::PendingHTLCsForwardable { - time_forwardable: time - }); - } - None => {}, - } + // We only want to push a PendingHTLCsForwardable event if no others are queued. + fn push_pending_forwards_ev(&self) { + let mut pending_events = self.pending_events.lock().unwrap(); + let forward_ev_exists = pending_events.iter() + .find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) + .is_some(); + if !forward_ev_exists { + pending_events.push(events::Event::PendingHTLCsForwardable { + time_forwardable: + Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), + }); } } @@ -4943,11 +5037,12 @@ where let mut htlcs_to_fail = Vec::new(); let res = loop { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5008,11 +5103,12 @@ where fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5025,11 +5121,12 @@ where fn internal_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5044,7 +5141,7 @@ where ), chan), // Note that announcement_signatures fails if the channel cannot be announced, // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: self.get_channel_update_for_broadcast(chan.get()).unwrap(), + update_msg: Some(self.get_channel_update_for_broadcast(chan.get()).unwrap()), }); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5063,7 +5160,7 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id); - if let None = peer_state_mutex_opt { + if peer_state_mutex_opt.is_none() { return Ok(NotifyOption::SkipPersist) } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); @@ -5098,11 +5195,12 @@ where let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5418,7 +5516,8 @@ where /// [`PaymentHash`] and [`PaymentPreimage`] for you. /// /// The [`PaymentPreimage`] will ultimately be returned to you in the [`PaymentClaimable`], which - /// will have the [`PaymentClaimable::payment_preimage`] field filled in. That should then be + /// will have the [`PaymentClaimable::purpose`] be [`PaymentPurpose::InvoicePayment`] with + /// its [`PaymentPurpose::InvoicePayment::payment_preimage`] field filled in. That should then be /// passed directly to [`claim_funds`]. /// /// See [`create_inbound_payment_for_hash`] for detailed documentation on behavior and requirements. @@ -5438,7 +5537,9 @@ where /// /// [`claim_funds`]: Self::claim_funds /// [`PaymentClaimable`]: events::Event::PaymentClaimable - /// [`PaymentClaimable::payment_preimage`]: events::Event::PaymentClaimable::payment_preimage + /// [`PaymentClaimable::purpose`]: events::Event::PaymentClaimable::purpose + /// [`PaymentPurpose::InvoicePayment`]: events::PaymentPurpose::InvoicePayment + /// [`PaymentPurpose::InvoicePayment::payment_preimage`]: events::PaymentPurpose::InvoicePayment::payment_preimage /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash pub fn create_inbound_payment(&self, min_value_msat: Option, invoice_expiry_delta_secs: u32, min_final_cltv_expiry_delta: Option) -> Result<(PaymentHash, PaymentSecret), ()> { @@ -5717,9 +5818,7 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if peer_state.pending_msg_events.len() > 0 { - let mut peer_pending_events = Vec::new(); - mem::swap(&mut peer_pending_events, &mut peer_state.pending_msg_events); - pending_events.append(&mut peer_pending_events); + pending_events.append(&mut peer_state.pending_msg_events); } } @@ -5970,7 +6069,7 @@ where msg: announcement, // Note that announcement_signatures fails if the channel cannot be announced, // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: self.get_channel_update_for_broadcast(channel).unwrap(), + update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()), }); } } @@ -6247,14 +6346,13 @@ where let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); } - fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) { + fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_channels = Vec::new(); - let mut no_channels_remain = true; let mut per_peer_state = self.per_peer_state.write().unwrap(); - { - log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.", - log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" }); + let remove_peer = { + log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates.", + log_pubkey!(counterparty_node_id)); if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; @@ -6265,8 +6363,6 @@ where update_maps_on_chan_removal!(self, chan); self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer); return false; - } else { - no_channels_remain = false; } true }); @@ -6286,6 +6382,7 @@ where &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, + &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::SendChannelUpdate { .. } => false, &events::MessageSendEvent::HandleError { .. } => false, &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, @@ -6294,9 +6391,12 @@ where &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, } }); - } - } - if no_channels_remain { + debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect"); + peer_state.is_connected = false; + peer_state.ok_to_remove(true) + } else { debug_assert!(false, "Unconnected peer disconnected"); true } + }; + if remove_peer { per_peer_state.remove(counterparty_node_id); } mem::drop(per_peer_state); @@ -6306,34 +6406,56 @@ where } } - fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init) -> Result<(), ()> { + fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init, inbound: bool) -> Result<(), ()> { if !init_msg.features.supports_static_remote_key() { - log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(counterparty_node_id)); + log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting", log_pubkey!(counterparty_node_id)); return Err(()); } - log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + // If we have too many peers connected which don't have funded channels, disconnect the + // peer immediately (as long as it doesn't have funded channels). If we have a bunch of + // unfunded channels taking up space in memory for disconnected peers, we still let new + // peers connect, but we'll reject new channels from them. + let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); + let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; + { let mut peer_state_lock = self.per_peer_state.write().unwrap(); match peer_state_lock.entry(counterparty_node_id.clone()) { hash_map::Entry::Vacant(e) => { + if inbound_peer_limited { + return Err(()); + } e.insert(Mutex::new(PeerState { channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + is_connected: true, })); }, hash_map::Entry::Occupied(e) => { - e.get().lock().unwrap().latest_features = init_msg.features.clone(); + let mut peer_state = e.get().lock().unwrap(); + peer_state.latest_features = init_msg.features.clone(); + + let best_block_height = self.best_block.read().unwrap().height(); + if inbound_peer_limited && + Self::unfunded_channel_count(&*peer_state, best_block_height) == + peer_state.channel_by_id.len() + { + return Err(()); + } + + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; }, } } - let per_peer_state = self.per_peer_state.read().unwrap(); + log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); + let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; @@ -6378,7 +6500,7 @@ where let channel_ids: Vec<[u8; 32]> = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { return; } + if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; peer_state.channel_by_id.keys().cloned().collect() @@ -6392,7 +6514,7 @@ where // First check if we can advance the channel type and try again. let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { return; } + if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; if let Some(chan) = peer_state.channel_by_id.get_mut(&msg.channel_id) { @@ -6886,6 +7008,7 @@ where best_block.block_hash().write(writer)?; } + let mut serializable_peer_count: u64 = 0; { let per_peer_state = self.per_peer_state.read().unwrap(); let mut unfunded_channels = 0; @@ -6893,6 +7016,9 @@ where for (_, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + if !peer_state.ok_to_remove(false) { + serializable_peer_count += 1; + } number_of_channels += peer_state.channel_by_id.len(); for (_, channel) in peer_state.channel_by_id.iter() { if !channel.is_funding_initiated() { @@ -6943,11 +7069,18 @@ where htlc_purposes.push(purpose); } - (per_peer_state.len() as u64).write(writer)?; + (serializable_peer_count).write(writer)?; for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() { - peer_pubkey.write(writer)?; - let peer_state = peer_state_mutex.lock().unwrap(); - peer_state.latest_features.write(writer)?; + let peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &*peer_state_lock; + // Peers which we have no channels to should be dropped once disconnected. As we + // disconnect all peers when shutting down and serializing the ChannelManager, we + // consider all peers as disconnected here. There's therefore no need write peers with + // no channels. + if !peer_state.ok_to_remove(false) { + peer_pubkey.write(writer)?; + peer_state.latest_features.write(writer)?; + } } let events = self.pending_events.lock().unwrap(); @@ -7346,6 +7479,7 @@ where channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), latest_features: Readable::read(reader)?, pending_msg_events: Vec::new(), + is_connected: false, }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -7515,7 +7649,8 @@ where } } - if !forward_htlcs.is_empty() { + let pending_outbounds = OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), retry_lock: Mutex::new(()) }; + if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() { // If we have pending HTLCs to forward, assume we either dropped a // `PendingHTLCsForwardable` or the user received it but never processed it as they // shut down before the timer hit. Either way, set the time_forwardable to a small @@ -7683,7 +7818,7 @@ where inbound_payment_key: expanded_inbound_key, pending_inbound_payments: Mutex::new(pending_inbound_payments), - pending_outbound_payments: OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()) }, + pending_outbound_payments: pending_outbounds, pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()), forward_htlcs: Mutex::new(forward_htlcs), @@ -7733,19 +7868,14 @@ where mod tests { use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; - use bitcoin::hashes::hex::FromHex; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; - use bitcoin::secp256k1::ecdsa::Signature; - use bitcoin::secp256k1::ffi::Signature as FFISignature; - use bitcoin::blockdata::script::Script; - use bitcoin::Txid; use core::time::Duration; use core::sync::atomic::Ordering; use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, InterceptId}; use crate::ln::functional_test_utils::*; use crate::ln::msgs; - use crate::ln::msgs::{ChannelMessageHandler, OptionalField}; + use crate::ln::msgs::ChannelMessageHandler; use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; use crate::util::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; @@ -7962,7 +8092,7 @@ mod tests { let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); create_announced_chan_between_nodes(&nodes, 0, 1); - let scorer = test_utils::TestScorer::with_penalty(0); + let scorer = test_utils::TestScorer::new(); let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); // To start (1), send a regular payment but don't claim it. @@ -8059,8 +8189,6 @@ mod tests { let payer_pubkey = nodes[0].node.get_our_node_id(); let payee_pubkey = nodes[1].node.get_our_node_id(); - nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap(); - nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap(); let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]); let route_params = RouteParameters { @@ -8070,7 +8198,7 @@ mod tests { }; let network_graph = nodes[0].network_graph.clone(); let first_hops = nodes[0].node.list_usable_channels(); - let scorer = test_utils::TestScorer::with_penalty(0); + let scorer = test_utils::TestScorer::new(); let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), @@ -8104,8 +8232,6 @@ mod tests { let payer_pubkey = nodes[0].node.get_our_node_id(); let payee_pubkey = nodes[1].node.get_our_node_id(); - nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap(); - nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap(); let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]); let route_params = RouteParameters { @@ -8115,7 +8241,7 @@ mod tests { }; let network_graph = nodes[0].network_graph.clone(); let first_hops = nodes[0].node.list_usable_channels(); - let scorer = test_utils::TestScorer::with_penalty(0); + let scorer = test_utils::TestScorer::new(); let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), @@ -8170,6 +8296,40 @@ mod tests { } } + #[test] + fn test_drop_disconnected_peers_when_removing_channels() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let chan = create_announced_chan_between_nodes(&nodes, 0, 1); + + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id()); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + + nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap(); + check_closed_broadcast!(nodes[0], true); + check_added_monitors!(nodes[0], 1); + check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed); + + { + // Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been + // disconnected and the channel between has been force closed. + let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap(); + // Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed. + assert_eq!(nodes_0_per_peer_state.len(), 1); + assert!(nodes_0_per_peer_state.get(&nodes[1].node.get_our_node_id()).is_some()); + } + + nodes[0].node.timer_tick_occurred(); + + { + // Assert that nodes[1] has now been removed. + assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 0); + } + } + #[test] fn bad_inbound_payment_hash() { // Add coverage for checking that a user-provided payment hash matches the payment secret. @@ -8314,19 +8474,22 @@ mod tests { fn check_not_connected_to_peer_error(res_err: Result, expected_public_key: PublicKey) { let expected_message = format!("Not connected to node: {}", expected_public_key); - check_api_misuse_error_message(expected_message, res_err) + check_api_error_message(expected_message, res_err) } fn check_unkown_peer_error(res_err: Result, expected_public_key: PublicKey) { let expected_message = format!("Can't find a peer matching the passed counterparty node_id {}", expected_public_key); - check_api_misuse_error_message(expected_message, res_err) + check_api_error_message(expected_message, res_err) } - fn check_api_misuse_error_message(expected_err_message: String, res_err: Result) { + fn check_api_error_message(expected_err_message: String, res_err: Result) { match res_err { Err(APIError::APIMisuseError { err }) => { assert_eq!(err, expected_err_message); }, + Err(APIError::ChannelUnavailable { err }) => { + assert_eq!(err, expected_err_message); + }, Ok(_) => panic!("Unexpected Ok"), Err(_) => panic!("Unexpected Error"), } @@ -8334,170 +8497,240 @@ mod tests { #[test] fn test_api_calls_with_unkown_counterparty_node() { - // Tests that our API functions and message handlers that expects a `counterparty_node_id` - // as input, behaves as expected if the `counterparty_node_id` is an unkown peer in the + // Tests that our API functions that expects a `counterparty_node_id` as input, behaves as + // expected if the `counterparty_node_id` is an unkown peer in the // `ChannelManager::per_peer_state` map. let chanmon_cfg = create_chanmon_cfgs(2); let node_cfg = create_node_cfgs(2, &chanmon_cfg); let node_chanmgr = create_node_chanmgrs(2, &node_cfg, &[None, None]); let nodes = create_network(2, &node_cfg, &node_chanmgr); - // Boilerplate code to produce `open_channel` and `accept_channel` msgs more densly than - // creating dummy ones. - nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 1_000_000, 500_000_000, 42, None).unwrap(); - let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); - nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); - let accept_channel_msg = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); - // Dummy values let channel_id = [4; 32]; - let signature = Signature::from(unsafe { FFISignature::new() }); let unkown_public_key = PublicKey::from_secret_key(&Secp256k1::signing_only(), &SecretKey::from_slice(&[42; 32]).unwrap()); let intercept_id = InterceptId([0; 32]); - // Dummy msgs - let funding_created_msg = msgs::FundingCreated { - temporary_channel_id: open_channel_msg.temporary_channel_id, - funding_txid: Txid::from_hex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap(), - funding_output_index: 0, - signature: signature, - }; - - let funding_signed_msg = msgs::FundingSigned { - channel_id: channel_id, - signature: signature, - }; - - let channel_ready_msg = msgs::ChannelReady { - channel_id: channel_id, - next_per_commitment_point: unkown_public_key, - short_channel_id_alias: None, - }; - - let announcement_signatures_msg = msgs::AnnouncementSignatures { - channel_id: channel_id, - short_channel_id: 0, - node_signature: signature, - bitcoin_signature: signature, - }; - - let channel_reestablish_msg = msgs::ChannelReestablish { - channel_id: channel_id, - next_local_commitment_number: 0, - next_remote_commitment_number: 0, - data_loss_protect: OptionalField::Absent, - }; - - let closing_signed_msg = msgs::ClosingSigned { - channel_id: channel_id, - fee_satoshis: 1000, - signature: signature, - fee_range: None, - }; - - let shutdown_msg = msgs::Shutdown { - channel_id: channel_id, - scriptpubkey: Script::new(), - }; - - let onion_routing_packet = msgs::OnionPacket { - version: 255, - public_key: Ok(unkown_public_key), - hop_data: [1; 20*65], - hmac: [2; 32] - }; - - let update_add_htlc_msg = msgs::UpdateAddHTLC { - channel_id: channel_id, - htlc_id: 0, - amount_msat: 1000000, - payment_hash: PaymentHash([1; 32]), - cltv_expiry: 821716, - onion_routing_packet - }; - - let commitment_signed_msg = msgs::CommitmentSigned { - channel_id: channel_id, - signature: signature, - htlc_signatures: Vec::new(), - }; + // Test the API functions. + check_not_connected_to_peer_error(nodes[0].node.create_channel(unkown_public_key, 1_000_000, 500_000_000, 42, None), unkown_public_key); - let update_fee_msg = msgs::UpdateFee { - channel_id: channel_id, - feerate_per_kw: 1000, - }; + check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&channel_id, &unkown_public_key, 42), unkown_public_key); - let malformed_update_msg = msgs::UpdateFailMalformedHTLC{ - channel_id: channel_id, - htlc_id: 0, - sha256_of_onion: [1; 32], - failure_code: 0x8000, - }; + check_unkown_peer_error(nodes[0].node.close_channel(&channel_id, &unkown_public_key), unkown_public_key); - let fulfill_update_msg = msgs::UpdateFulfillHTLC{ - channel_id: channel_id, - htlc_id: 0, - payment_preimage: PaymentPreimage([1; 32]), - }; + check_unkown_peer_error(nodes[0].node.force_close_broadcasting_latest_txn(&channel_id, &unkown_public_key), unkown_public_key); - let fail_update_msg = msgs::UpdateFailHTLC{ - channel_id: channel_id, - htlc_id: 0, - reason: msgs::OnionErrorPacket { data: Vec::new()}, - }; + check_unkown_peer_error(nodes[0].node.force_close_without_broadcasting_txn(&channel_id, &unkown_public_key), unkown_public_key); - let revoke_and_ack_msg = msgs::RevokeAndACK { - channel_id: channel_id, - per_commitment_secret: [1; 32], - next_per_commitment_point: unkown_public_key, - }; + check_unkown_peer_error(nodes[0].node.forward_intercepted_htlc(intercept_id, &channel_id, unkown_public_key, 1_000_000), unkown_public_key); - // Test the API functions and message handlers. - check_not_connected_to_peer_error(nodes[0].node.create_channel(unkown_public_key, 1_000_000, 500_000_000, 42, None), unkown_public_key); + check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key); + } - nodes[1].node.handle_open_channel(&unkown_public_key, &open_channel_msg); + #[test] + fn test_connection_limiting() { + // Test that we limit un-channel'd peers and un-funded channels properly. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - nodes[0].node.handle_accept_channel(&unkown_public_key, &accept_channel_msg); + // Note that create_network connects the nodes together for us - check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&open_channel_msg.temporary_channel_id, &unkown_public_key, 42), unkown_public_key); + nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap(); + let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); - nodes[1].node.handle_funding_created(&unkown_public_key, &funding_created_msg); + let mut funding_tx = None; + for idx in 0..super::MAX_UNFUNDED_CHANS_PER_PEER { + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + let accept_channel = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_funding_signed(&unkown_public_key, &funding_signed_msg); + if idx == 0 { + nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), &accept_channel); + let (temporary_channel_id, tx, _) = create_funding_transaction(&nodes[0], &nodes[1].node.get_our_node_id(), 100_000, 42); + funding_tx = Some(tx.clone()); + nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), tx).unwrap(); + let funding_created_msg = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()); - nodes[0].node.handle_channel_ready(&unkown_public_key, &channel_ready_msg); + nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msg); + check_added_monitors!(nodes[1], 1); + let funding_signed = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id()); - nodes[1].node.handle_announcement_signatures(&unkown_public_key, &announcement_signatures_msg); + nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed); + check_added_monitors!(nodes[0], 1); + } + open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes(); + } - check_unkown_peer_error(nodes[0].node.close_channel(&channel_id, &unkown_public_key), unkown_public_key); + // A MAX_UNFUNDED_CHANS_PER_PEER + 1 channel will be summarily rejected + open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes(); + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id, + open_channel_msg.temporary_channel_id); + + // Further, because all of our channels with nodes[0] are inbound, and none of them funded, + // it doesn't count as a "protected" peer, i.e. it counts towards the MAX_NO_CHANNEL_PEERS + // limit. + let mut peer_pks = Vec::with_capacity(super::MAX_NO_CHANNEL_PEERS); + for _ in 1..super::MAX_NO_CHANNEL_PEERS { + let random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx, + &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap()); + peer_pks.push(random_pk); + nodes[1].node.peer_connected(&random_pk, &msgs::Init { + features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); + } + let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx, + &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap()); + nodes[1].node.peer_connected(&last_random_pk, &msgs::Init { + features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err(); + + // Also importantly, because nodes[0] isn't "protected", we will refuse a reconnection from + // them if we have too many un-channel'd peers. + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + let chan_closed_events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(chan_closed_events.len(), super::MAX_UNFUNDED_CHANS_PER_PEER - 1); + for ev in chan_closed_events { + if let Event::ChannelClosed { .. } = ev { } else { panic!(); } + } + nodes[1].node.peer_connected(&last_random_pk, &msgs::Init { + features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); + nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err(); + + // but of course if the connection is outbound its allowed... + nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), remote_network_address: None }, false).unwrap(); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + + // Now nodes[0] is disconnected but still has a pending, un-funded channel lying around. + // Even though we accept one more connection from new peers, we won't actually let them + // open channels. + assert!(peer_pks.len() > super::MAX_UNFUNDED_CHANNEL_PEERS - 1); + for i in 0..super::MAX_UNFUNDED_CHANNEL_PEERS - 1 { + nodes[1].node.handle_open_channel(&peer_pks[i], &open_channel_msg); + get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, peer_pks[i]); + open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes(); + } + nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg); + assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id, + open_channel_msg.temporary_channel_id); + + // Of course, however, outbound channels are always allowed + nodes[1].node.create_channel(last_random_pk, 100_000, 0, 42, None).unwrap(); + get_event_msg!(nodes[1], MessageSendEvent::SendOpenChannel, last_random_pk); + + // If we fund the first channel, nodes[0] has a live on-chain channel with us, it is now + // "protected" and can connect again. + mine_transaction(&nodes[1], funding_tx.as_ref().unwrap()); + nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); + get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id()); + + // Further, because the first channel was funded, we can open another channel with + // last_random_pk. + nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg); + get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk); + } - check_unkown_peer_error(nodes[0].node.force_close_broadcasting_latest_txn(&channel_id, &unkown_public_key), unkown_public_key); + #[test] + fn test_outbound_chans_unlimited() { + // Test that we never refuse an outbound channel even if a peer is unfuned-channel-limited + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - check_unkown_peer_error(nodes[0].node.force_close_without_broadcasting_txn(&channel_id, &unkown_public_key), unkown_public_key); + // Note that create_network connects the nodes together for us - check_unkown_peer_error(nodes[0].node.forward_intercepted_htlc(intercept_id, &channel_id, unkown_public_key, 1_000_000), unkown_public_key); + nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap(); + let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); - check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key); + for _ in 0..super::MAX_UNFUNDED_CHANS_PER_PEER { + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); + open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes(); + } - nodes[0].node.handle_shutdown(&unkown_public_key, &shutdown_msg); + // Once we have MAX_UNFUNDED_CHANS_PER_PEER unfunded channels, new inbound channels will be + // rejected. + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id, + open_channel_msg.temporary_channel_id); - nodes[1].node.handle_closing_signed(&unkown_public_key, &closing_signed_msg); + // but we can still open an outbound channel. + nodes[1].node.create_channel(nodes[0].node.get_our_node_id(), 100_000, 0, 42, None).unwrap(); + get_event_msg!(nodes[1], MessageSendEvent::SendOpenChannel, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_channel_reestablish(&unkown_public_key, &channel_reestablish_msg); + // but even with such an outbound channel, additional inbound channels will still fail. + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); + assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id, + open_channel_msg.temporary_channel_id); + } - nodes[1].node.handle_update_add_htlc(&unkown_public_key, &update_add_htlc_msg); + #[test] + fn test_0conf_limiting() { + // Tests that we properly limit inbound channels when we have the manual-channel-acceptance + // flag set and (sometimes) accept channels as 0conf. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let mut settings = test_default_channel_config(); + settings.manually_accept_inbound_channels = true; + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, Some(settings)]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - nodes[1].node.handle_commitment_signed(&unkown_public_key, &commitment_signed_msg); + // Note that create_network connects the nodes together for us - nodes[1].node.handle_update_fail_malformed_htlc(&unkown_public_key, &malformed_update_msg); + nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap(); + let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); - nodes[1].node.handle_update_fail_htlc(&unkown_public_key, &fail_update_msg); + // First, get us up to MAX_UNFUNDED_CHANNEL_PEERS so we can test at the edge + for _ in 0..super::MAX_UNFUNDED_CHANNEL_PEERS - 1 { + let random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx, + &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap()); + nodes[1].node.peer_connected(&random_pk, &msgs::Init { + features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); - nodes[1].node.handle_update_fulfill_htlc(&unkown_public_key, &fulfill_update_msg); + nodes[1].node.handle_open_channel(&random_pk, &open_channel_msg); + let events = nodes[1].node.get_and_clear_pending_events(); + match events[0] { + Event::OpenChannelRequest { temporary_channel_id, .. } => { + nodes[1].node.accept_inbound_channel(&temporary_channel_id, &random_pk, 23).unwrap(); + } + _ => panic!("Unexpected event"), + } + get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, random_pk); + open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes(); + } - nodes[1].node.handle_revoke_and_ack(&unkown_public_key, &revoke_and_ack_msg); + // If we try to accept a channel from another peer non-0conf it will fail. + let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx, + &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap()); + nodes[1].node.peer_connected(&last_random_pk, &msgs::Init { + features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); + nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg); + let events = nodes[1].node.get_and_clear_pending_events(); + match events[0] { + Event::OpenChannelRequest { temporary_channel_id, .. } => { + match nodes[1].node.accept_inbound_channel(&temporary_channel_id, &last_random_pk, 23) { + Err(APIError::APIMisuseError { err }) => + assert_eq!(err, "Too many peers with unfunded channels, refusing to accept new ones"), + _ => panic!(), + } + } + _ => panic!("Unexpected event"), + } + assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id, + open_channel_msg.temporary_channel_id); - nodes[1].node.handle_update_fee(&unkown_public_key, &update_fee_msg); + // ...however if we accept the same channel 0conf it should work just fine. + nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg); + let events = nodes[1].node.get_and_clear_pending_events(); + match events[0] { + Event::OpenChannelRequest { temporary_channel_id, .. } => { + nodes[1].node.accept_inbound_channel_from_trusted_peer_0conf(&temporary_channel_id, &last_random_pk, 23).unwrap(); + } + _ => panic!("Unexpected event"), + } + get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk); } #[cfg(anchors)] @@ -8585,7 +8818,8 @@ pub mod bench { let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}; let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; let logger_a = test_utils::TestLogger::with_id("node a".to_owned()); - let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(genesis_hash, &logger_a))); + let scorer = Mutex::new(test_utils::TestScorer::new()); + let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(genesis_hash, &logger_a)), &scorer); let mut config: UserConfig = Default::default(); config.channel_handshake_config.minimum_depth = 1; @@ -8609,8 +8843,8 @@ pub mod bench { }); let node_b_holder = NodeHolder { node: &node_b }; - node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }).unwrap(); - node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }).unwrap(); + node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }, true).unwrap(); + node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }, false).unwrap(); node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap(); node_b.handle_open_channel(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id())); node_a.handle_accept_channel(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id())); @@ -8676,7 +8910,7 @@ pub mod bench { let usable_channels = $node_a.list_usable_channels(); let payment_params = PaymentParameters::from_node_id($node_b.get_our_node_id(), TEST_FINAL_CLTV) .with_features($node_b.invoice_features()); - let scorer = test_utils::TestScorer::with_penalty(0); + let scorer = test_utils::TestScorer::new(); let seed = [3u8; 32]; let keys_manager = KeysManager::new(&seed, 42, 42); let random_seed_bytes = keys_manager.get_secure_random_bytes();