X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=10d2a69d20fc2c1d321ef5d45d0497fc8e08a147;hb=30b9d9fbeaa62537beb8d3ea0b2866703d0d7c92;hp=59d8e387529412a5d0dcd8adc61050791d8643fe;hpb=f78448c604e3859d6c387d253549474e667e9527;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 59d8e387..10d2a69d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -487,6 +487,22 @@ pub(super) struct PeerState { /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// The peer is currently connected (i.e. we've seen a + /// [`ChannelMessageHandler::peer_connected`] and no corresponding + /// [`ChannelMessageHandler::peer_disconnected`]. + is_connected: bool, +} + +impl PeerState { + /// Indicates that a peer meets the criteria where we're ok to remove it from our storage. + /// If true is passed for `require_disconnected`, the function will return false if we haven't + /// disconnected from the node already, ie. `PeerState::is_connected` is set to `true`. + fn ok_to_remove(&self, require_disconnected: bool) -> bool { + if require_disconnected && self.is_connected { + return false + } + self.channel_by_id.len() == 0 + } } /// Stores a PaymentSecret and any other data we may need to validate an inbound payment is @@ -763,9 +779,8 @@ where /// very far in the past, and can only ever be up to two hours in the future. highest_seen_timestamp: AtomicUsize, - /// The bulk of our storage will eventually be here (message queues and the like). Currently - /// the `per_peer_state` stores our channels on a per-peer basis, as well as the peer's latest - /// features. + /// The bulk of our storage. Currently the `per_peer_state` stores our channels on a per-peer + /// basis, as well as the peer's latest features. /// /// If we are connected to a peer we always at least have an entry here, even if no channels /// are currently open with that peer. @@ -1175,9 +1190,9 @@ pub enum RecentPaymentDetails { /// made before LDK version 0.0.104. payment_hash: Option, }, - /// After a payment is explicitly abandoned by calling [`ChannelManager::abandon_payment`], it - /// is marked as abandoned until an [`Event::PaymentFailed`] is generated. A payment could also - /// be marked as abandoned if pathfinding fails repeatedly or retries have been exhausted. + /// After a payment's retries are exhausted per the provided [`Retry`], or it is explicitly + /// abandoned via [`ChannelManager::abandon_payment`], it is marked as abandoned until all + /// pending HTLCs for this payment resolve and an [`Event::PaymentFailed`] is generated. Abandoned { /// Hash of the payment that we have given up trying to send. payment_hash: PaymentHash, @@ -1243,26 +1258,6 @@ macro_rules! handle_error { let mut peer_state = peer_state_mutex.lock().unwrap(); peer_state.pending_msg_events.append(&mut msg_events); } - #[cfg(any(feature = "_test_utils", test))] - { - if let None = per_peer_state.get(&$counterparty_node_id) { - // This shouldn't occour in tests unless an unkown counterparty_node_id - // has been passed to our message handling functions. - let expected_error_str = format!("Can't find a peer matching the passed counterparty node_id {}", $counterparty_node_id); - match err.action { - msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { ref channel_id, ref data } - } - => { - assert_eq!(*data, expected_error_str); - if let Some((err_channel_id, _user_channel_id)) = chan_id { - debug_assert_eq!(*channel_id, err_channel_id); - } - } - _ => debug_assert!(false, "Unexpected event"), - } - } - } } // Return error in case higher-API need one @@ -1592,12 +1587,10 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&their_network_key); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError { err: format!("Not connected to node: {}", their_network_key) }); - } + let peer_state_mutex = per_peer_state.get(&their_network_key) + .ok_or_else(|| APIError::APIMisuseError{ err: format!("Not connected to node: {}", their_network_key) })?; - let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state = peer_state_mutex.lock().unwrap(); let channel = { let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); let their_features = &peer_state.latest_features; @@ -1635,14 +1628,13 @@ where } fn list_channels_with_filter::Signer>)) -> bool + Copy>(&self, f: Fn) -> Vec { - let mut res = Vec::new(); // Allocate our best estimate of the number of channels we have in the `res` // Vec. Sadly the `short_to_chan_info` map doesn't cover channels without // a scid or a scid alias, and the `id_to_peer` shouldn't be used outside // of the ChannelMonitor handling. Therefore reallocations may still occur, but is // unlikely as the `short_to_chan_info` map often contains 2 entries for // the same channel. - res.reserve(self.short_to_chan_info.read().unwrap().len()); + let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len()); { let best_block_height = self.best_block.read().unwrap().height(); let per_peer_state = self.per_peer_state.read().unwrap(); @@ -1726,7 +1718,7 @@ where /// /// This can be useful for payments that may have been prepared, but ultimately not sent, as a /// result of a crash. If such a payment exists, is not listed here, and an - /// [`Event::PaymentSent`] has not been received, you may consider retrying the payment. + /// [`Event::PaymentSent`] has not been received, you may consider resending the payment. /// /// [`Event::PaymentSent`]: events::Event::PaymentSent pub fn list_recent_payments(&self) -> Vec { @@ -1772,12 +1764,10 @@ where let result: Result<(), _> = loop { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -1893,12 +1883,10 @@ where fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: &PublicKey, peer_msg: Option<&String>, broadcast: bool) -> Result { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(peer_node_id); + let peer_state_mutex = per_peer_state.get(peer_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?; let mut chan = { - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(chan) = peer_state.channel_by_id.entry(channel_id.clone()) { if let Some(peer_msg) = peer_msg { @@ -1914,7 +1902,7 @@ where log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); self.finish_force_close_channel(chan.force_shutdown(broadcast)); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state = peer_state_mutex.lock().unwrap(); peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); @@ -2201,7 +2189,7 @@ where let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { + if peer_state_mutex_opt.is_none() { break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); @@ -2315,7 +2303,9 @@ where /// public, and thus should be called whenever the result is going to be passed out in a /// [`MessageSendEvent::BroadcastChannelUpdate`] event. /// - /// May be called with peer_state already locked! + /// Note that in `internal_closing_signed`, this function is called without the `peer_state` + /// corresponding to the channel's counterparty locked, as the channel been removed from the + /// storage and the `peer_state` lock has been dropped. fn get_channel_update_for_broadcast(&self, chan: &Channel<::Signer>) -> Result { if !chan.should_announce() { return Err(LightningError { @@ -2334,7 +2324,10 @@ where /// is public (only returning an Err if the channel does not yet have an assigned short_id), /// and thus MUST NOT be called unless the recipient of the resulting message has already /// provided evidence that they know about the existence of the channel. - /// May be called with peer_state already locked! + /// + /// Note that through `internal_closing_signed`, this function is called without the + /// `peer_state` corresponding to the channel's counterparty locked, as the channel been + /// removed from the storage and the `peer_state` lock has been dropped. fn get_channel_update_for_unicast(&self, chan: &Channel<::Signer>) -> Result { log_trace!(self.logger, "Attempting to generate channel update for channel {}", log_bytes!(chan.channel_id())); let short_channel_id = match chan.get_short_channel_id().or(chan.latest_inbound_scid_alias()) { @@ -2395,11 +2388,9 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::InvalidRoute{err: "No peer matching the path's first hop found!" }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(&counterparty_node_id) + .ok_or_else(|| APIError::InvalidRoute{err: "No peer matching the path's first hop found!" })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) { match { @@ -2484,8 +2475,8 @@ where /// If a pending payment is currently in-flight with the same [`PaymentId`] provided, this /// method will error with an [`APIError::InvalidRoute`]. Note, however, that once a payment /// is no longer pending (either via [`ChannelManager::abandon_payment`], or handling of an - /// [`Event::PaymentSent`]) LDK will not stop you from sending a second payment with the same - /// [`PaymentId`]. + /// [`Event::PaymentSent`] or [`Event::PaymentFailed`]) LDK will not stop you from sending a + /// second payment with the same [`PaymentId`]. /// /// Thus, in order to ensure duplicate payments are not sent, you should implement your own /// tracking of payments, including state to indicate once a payment has completed. Because you @@ -2530,6 +2521,7 @@ where /// [`Route`], we assume the invoice had the basic_mpp feature set. /// /// [`Event::PaymentSent`]: events::Event::PaymentSent + /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress pub fn send_payment(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option, payment_id: PaymentId) -> Result<(), PaymentSendFailure> { @@ -2546,7 +2538,7 @@ where let best_block_height = self.best_block.read().unwrap().height(); self.pending_outbound_payments .send_payment(payment_hash, payment_secret, payment_id, retry_strategy, route_params, - &self.router, self.list_usable_channels(), self.compute_inflight_htlcs(), + &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.logger, |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) @@ -2567,48 +2559,25 @@ where } - /// Retries a payment along the given [`Route`]. - /// - /// Errors returned are a superset of those returned from [`send_payment`], so see - /// [`send_payment`] documentation for more details on errors. This method will also error if the - /// retry amount puts the payment more than 10% over the payment's total amount, if the payment - /// for the given `payment_id` cannot be found (likely due to timeout or success), or if - /// further retries have been disabled with [`abandon_payment`]. - /// - /// [`send_payment`]: [`ChannelManager::send_payment`] - /// [`abandon_payment`]: [`ChannelManager::abandon_payment`] - pub fn retry_payment(&self, route: &Route, payment_id: PaymentId) -> Result<(), PaymentSendFailure> { - let best_block_height = self.best_block.read().unwrap().height(); - self.pending_outbound_payments.retry_payment_with_route(route, payment_id, &self.entropy_source, &self.node_signer, best_block_height, - |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| - self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) - } - - /// Signals that no further retries for the given payment will occur. + /// Signals that no further retries for the given payment should occur. Useful if you have a + /// pending outbound payment with retries remaining, but wish to stop retrying the payment before + /// retries are exhausted. /// - /// After this method returns, no future calls to [`retry_payment`] for the given `payment_id` - /// are allowed. If no [`Event::PaymentFailed`] event had been generated before, one will be - /// generated as soon as there are no remaining pending HTLCs for this payment. + /// If no [`Event::PaymentFailed`] event had been generated before, one will be generated as soon + /// as there are no remaining pending HTLCs for this payment. /// /// Note that calling this method does *not* prevent a payment from succeeding. You must still /// wait until you receive either a [`Event::PaymentFailed`] or [`Event::PaymentSent`] event to /// determine the ultimate status of a payment. /// /// If an [`Event::PaymentFailed`] event is generated and we restart without this - /// [`ChannelManager`] having been persisted, the payment may still be in the pending state - /// upon restart. This allows further calls to [`retry_payment`] (and requiring a second call - /// to [`abandon_payment`] to mark the payment as failed again). Otherwise, future calls to - /// [`retry_payment`] will fail with [`PaymentSendFailure::ParameterError`]. + /// [`ChannelManager`] having been persisted, another [`Event::PaymentFailed`] may be generated. /// - /// [`abandon_payment`]: Self::abandon_payment - /// [`retry_payment`]: Self::retry_payment /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`Event::PaymentSent`]: events::Event::PaymentSent pub fn abandon_payment(&self, payment_id: PaymentId) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - if let Some(payment_failed_ev) = self.pending_outbound_payments.abandon_payment(payment_id) { - self.pending_events.lock().unwrap().push(payment_failed_ev); - } + self.pending_outbound_payments.abandon_payment(payment_id, &self.pending_events); } /// Send a spontaneous payment, which is a payment that does not require the recipient to have @@ -2646,7 +2615,7 @@ where let best_block_height = self.best_block.read().unwrap().height(); self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), - self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, + || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.logger, |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) @@ -2675,12 +2644,10 @@ where &self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput ) -> Result<(), APIError> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let (chan, msg) = { let (res, chan) = { @@ -2846,11 +2813,9 @@ where &self.total_consistency_lock, &self.persistence_notifier, ); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; for channel_id in channel_ids { if !peer_state.channel_by_id.contains_key(channel_id) { @@ -2903,24 +2868,22 @@ where let next_hop_scid = { let peer_state_lock = self.per_peer_state.read().unwrap(); - if let Some(peer_state_mutex) = peer_state_lock.get(&next_node_id) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.get(next_hop_channel_id) { - Some(chan) => { - if !chan.is_usable() { - return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id)) - }) - } - chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias()) - }, - None => return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id) - }) - } - } else { - return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) }); + let peer_state_mutex = peer_state_lock.get(&next_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.get(next_hop_channel_id) { + Some(chan) => { + if !chan.is_usable() { + return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id)) + }) + } + chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias()) + }, + None => return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id) + }) } }; @@ -3100,7 +3063,7 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { + if peer_state_mutex_opt.is_none() { forwarding_channel_not_found!(); continue; } @@ -3387,7 +3350,8 @@ where let best_block_height = self.best_block.read().unwrap().height(); self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(), - || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.logger, + || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, + &self.pending_events, &self.logger, |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)); @@ -3492,6 +3456,7 @@ where /// the channel. /// * Expiring a channel's previous `ChannelConfig` if necessary to only allow forwarding HTLCs /// with the current `ChannelConfig`. + /// * Removing peers which have disconnected but and no longer have any channels. /// /// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate /// estimate fetches. @@ -3504,19 +3469,21 @@ where let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new(); let mut timed_out_mpp_htlcs = Vec::new(); + let mut pending_peers_awaiting_removal = Vec::new(); { let per_peer_state = self.per_peer_state.read().unwrap(); for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; + let counterparty_node_id = *counterparty_node_id; peer_state.channel_by_id.retain(|chan_id, chan| { let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } if let Err(e) = chan.timer_check_closing_negotiation_progress() { let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id); - handle_errors.push((Err(err), *counterparty_node_id)); + handle_errors.push((Err(err), counterparty_node_id)); if needs_close { return false; } } @@ -3550,6 +3517,36 @@ where true }); + if peer_state.ok_to_remove(true) { + pending_peers_awaiting_removal.push(counterparty_node_id); + } + } + } + + // When a peer disconnects but still has channels, the peer's `peer_state` entry in the + // `per_peer_state` is not removed by the `peer_disconnected` function. If the channels + // of to that peer is later closed while still being disconnected (i.e. force closed), + // we therefore need to remove the peer from `peer_state` separately. + // To avoid having to take the `per_peer_state` `write` lock once the channels are + // closed, we instead remove such peers awaiting removal here on a timer, to limit the + // negative effects on parallelism as much as possible. + if pending_peers_awaiting_removal.len() > 0 { + let mut per_peer_state = self.per_peer_state.write().unwrap(); + for counterparty_node_id in pending_peers_awaiting_removal { + match per_peer_state.entry(counterparty_node_id) { + hash_map::Entry::Occupied(entry) => { + // Remove the entry if the peer is still disconnected and we still + // have no channels to the peer. + let remove_entry = { + let peer_state = entry.get().lock().unwrap(); + peer_state.ok_to_remove(true) + }; + if remove_entry { + entry.remove_entry(); + } + }, + hash_map::Entry::Vacant(_) => { /* The PeerState has already been removed */ } + } } } @@ -3727,7 +3724,7 @@ where fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) { #[cfg(any(feature = "_test_utils", test))] { - // Ensure that no peer state channel storage lock is not held when calling this + // Ensure that the peer state channel storage lock is not held when calling this // function. // This ensures that future code doesn't introduce a lock_order requirement for // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling @@ -3850,7 +3847,7 @@ where let mut expected_amt_msat = None; let mut valid_mpp = true; let mut errs = Vec::new(); - let mut per_peer_state = Some(self.per_peer_state.read().unwrap()); + let per_peer_state = self.per_peer_state.read().unwrap(); for htlc in sources.iter() { let (counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) { Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), @@ -3860,16 +3857,16 @@ where } }; - if let None = per_peer_state.as_ref().unwrap().get(&counterparty_node_id) { + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { valid_mpp = false; break; } - let peer_state_mutex = per_peer_state.as_ref().unwrap().get(&counterparty_node_id).unwrap(); - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let None = peer_state.channel_by_id.get(&chan_id) { + if peer_state.channel_by_id.get(&chan_id).is_none() { valid_mpp = false; break; } @@ -3895,14 +3892,13 @@ where claimable_amt_msat += htlc.value; } + mem::drop(per_peer_state); if sources.is_empty() || expected_amt_msat.is_none() { - mem::drop(per_peer_state); self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); log_info!(self.logger, "Attempted to claim an incomplete payment which no longer had any available HTLCs!"); return; } if claimable_amt_msat != expected_amt_msat.unwrap() { - mem::drop(per_peer_state); self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); log_info!(self.logger, "Attempted to claim an incomplete payment, expected {} msat, had {} available to claim.", expected_amt_msat.unwrap(), claimable_amt_msat); @@ -3910,8 +3906,7 @@ where } if valid_mpp { for htlc in sources.drain(..) { - if per_peer_state.is_none() { per_peer_state = Some(self.per_peer_state.read().unwrap()); } - if let Err((pk, err)) = self.claim_funds_from_hop(per_peer_state.take().unwrap(), + if let Err((pk, err)) = self.claim_funds_from_hop( htlc.prev_hop, payment_preimage, |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })) { @@ -3923,7 +3918,6 @@ where } } } - mem::drop(per_peer_state); if !valid_mpp { for htlc in sources.drain(..) { let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec(); @@ -3944,11 +3938,11 @@ where } fn claim_funds_from_hop) -> Option>(&self, - per_peer_state_lock: RwLockReadGuard::Signer>>>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc) -> Result<(), (PublicKey, MsgHandleErrInternal)> { //TODO: Delay the claimed_funds relaying just like we do outbound relay! + let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.outpoint.to_channel_id(); let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) { @@ -3956,83 +3950,76 @@ where None => None }; - let (found_channel, mut peer_state_opt) = if counterparty_node_id_opt.is_some() && per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).is_some() { - let peer_mutex = per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).unwrap(); - let peer_state = peer_mutex.lock().unwrap(); - let found_channel = peer_state.channel_by_id.contains_key(&chan_id); - (found_channel, Some(peer_state)) - } else { (false, None) }; - - if found_channel { - let peer_state = &mut *peer_state_opt.as_mut().unwrap(); - if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) { - let counterparty_node_id = chan.get().get_counterparty_node_id(); - match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { - Ok(msgs_monitor_option) => { - if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { - match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, - "Failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, e); - let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); - mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - return Err((counterparty_node_id, err)); - } - } - if let Some((msg, commitment_signed)) = msgs { - log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", - log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id, - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: vec![msg], - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - } - }); - } - mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - Ok(()) - } else { - Ok(()) - } - }, - Err((e, monitor_update)) => { + let mut peer_state_opt = counterparty_node_id_opt.as_ref().map( + |counterparty_node_id| per_peer_state.get(counterparty_node_id).map( + |peer_mutex| peer_mutex.lock().unwrap() + ) + ).unwrap_or(None); + + if let Some(hash_map::Entry::Occupied(mut chan)) = peer_state_opt.as_mut().map(|peer_state| peer_state.channel_by_id.entry(chan_id)) + { + let counterparty_node_id = chan.get().get_counterparty_node_id(); + match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { + Ok(msgs_monitor_option) => { + if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { ChannelMonitorUpdateStatus::Completed => {}, e => { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same update and try - // again on restart. - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, - "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", + log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, + "Failed to update channel monitor with preimage {:?}: {:?}", payment_preimage, e); - }, + let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); + mem::drop(peer_state_opt); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); + return Err((counterparty_node_id, err)); + } } - let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); - if drop { - chan.remove_entry(); + if let Some((msg, commitment_signed)) = msgs { + log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", + log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); + peer_state_opt.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: counterparty_node_id, + updates: msgs::CommitmentUpdate { + update_add_htlcs: Vec::new(), + update_fulfill_htlcs: vec![msg], + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + } + }); } mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); - self.handle_monitor_update_completion_actions(completion_action(None)); - Err((counterparty_node_id, res)) - }, - } - } else { - // We've held the peer_state mutex since finding the channel and setting - // found_channel to true, so the channel can't have been dropped. - unreachable!() + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); + Ok(()) + } else { + Ok(()) + } + }, + Err((e, monitor_update)) => { + match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { + ChannelMonitorUpdateStatus::Completed => {}, + e => { + // TODO: This needs to be handled somehow - if we receive a monitor update + // with a preimage we *must* somehow manage to propagate it to the upstream + // channel, or we must have an ability to receive the same update and try + // again on restart. + log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, + "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", + payment_preimage, e); + }, + } + let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); + if drop { + chan.remove_entry(); + } + mem::drop(peer_state_opt); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(completion_action(None)); + Err((counterparty_node_id, res)) + }, } } else { let preimage_update = ChannelMonitorUpdate { @@ -4053,7 +4040,7 @@ where payment_preimage, update_res); } mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); + mem::drop(per_peer_state); // Note that we do process the completion action here. This totally could be a // duplicate claim, but we have no way of knowing without interrogating the // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are @@ -4075,7 +4062,7 @@ where }, HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; - let res = self.claim_funds_from_hop(self.per_peer_state.read().unwrap(), hop_data, payment_preimage, + let res = self.claim_funds_from_hop(hop_data, payment_preimage, |htlc_claim_value_msat| { if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat { @@ -4207,7 +4194,7 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); let mut peer_state_lock; let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { return } + if peer_state_mutex_opt.is_none() { return } peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; let mut channel = { @@ -4297,11 +4284,9 @@ where let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(temporary_channel_id.clone()) { hash_map::Entry::Occupied(mut channel) => { @@ -4349,11 +4334,12 @@ where let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id.clone())) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id.clone()) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let mut channel = match Channel::new_from_req(&self.fee_estimator, &self.entropy_source, &self.signer_provider, counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration, @@ -4401,11 +4387,12 @@ where fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { let (value, output_script, user_id) = { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4428,13 +4415,14 @@ where fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id) + })?; let ((funding_msg, monitor, mut channel_ready), mut chan) = { let best_block = *self.best_block.read().unwrap(); - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4471,7 +4459,7 @@ where // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the // peer exists, despite the inner PeerState potentially having no channels after removing // the channel above. - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_msg.channel_id) { hash_map::Entry::Occupied(_) => { @@ -4506,12 +4494,13 @@ where let funding_tx = { let best_block = *self.best_block.read().unwrap(); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4549,11 +4538,12 @@ where fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4592,11 +4582,12 @@ where let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>; let result: Result<(), _> = loop { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -4645,12 +4636,13 @@ where fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; let (tx, chan_option) = { - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -4679,7 +4671,7 @@ where } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update @@ -4702,11 +4694,12 @@ where let pending_forward_info = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4743,11 +4736,12 @@ where fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> { let (htlc_source, forwarded_htlc_value) = { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4762,11 +4756,12 @@ where fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4779,11 +4774,12 @@ where fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4800,11 +4796,12 @@ where fn internal_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4943,11 +4940,12 @@ where let mut htlcs_to_fail = Vec::new(); let res = loop { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5008,11 +5006,12 @@ where fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5025,11 +5024,12 @@ where fn internal_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5044,7 +5044,7 @@ where ), chan), // Note that announcement_signatures fails if the channel cannot be announced, // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: self.get_channel_update_for_broadcast(chan.get()).unwrap(), + update_msg: Some(self.get_channel_update_for_broadcast(chan.get()).unwrap()), }); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5063,7 +5063,7 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id); - if let None = peer_state_mutex_opt { + if peer_state_mutex_opt.is_none() { return Ok(NotifyOption::SkipPersist) } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); @@ -5098,11 +5098,12 @@ where let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5418,7 +5419,8 @@ where /// [`PaymentHash`] and [`PaymentPreimage`] for you. /// /// The [`PaymentPreimage`] will ultimately be returned to you in the [`PaymentClaimable`], which - /// will have the [`PaymentClaimable::payment_preimage`] field filled in. That should then be + /// will have the [`PaymentClaimable::purpose`] be [`PaymentPurpose::InvoicePayment`] with + /// its [`PaymentPurpose::InvoicePayment::payment_preimage`] field filled in. That should then be /// passed directly to [`claim_funds`]. /// /// See [`create_inbound_payment_for_hash`] for detailed documentation on behavior and requirements. @@ -5438,7 +5440,9 @@ where /// /// [`claim_funds`]: Self::claim_funds /// [`PaymentClaimable`]: events::Event::PaymentClaimable - /// [`PaymentClaimable::payment_preimage`]: events::Event::PaymentClaimable::payment_preimage + /// [`PaymentClaimable::purpose`]: events::Event::PaymentClaimable::purpose + /// [`PaymentPurpose::InvoicePayment`]: events::PaymentPurpose::InvoicePayment + /// [`PaymentPurpose::InvoicePayment::payment_preimage`]: events::PaymentPurpose::InvoicePayment::payment_preimage /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash pub fn create_inbound_payment(&self, min_value_msat: Option, invoice_expiry_delta_secs: u32, min_final_cltv_expiry_delta: Option) -> Result<(PaymentHash, PaymentSecret), ()> { @@ -5717,9 +5721,7 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if peer_state.pending_msg_events.len() > 0 { - let mut peer_pending_events = Vec::new(); - mem::swap(&mut peer_pending_events, &mut peer_state.pending_msg_events); - pending_events.append(&mut peer_pending_events); + pending_events.append(&mut peer_state.pending_msg_events); } } @@ -5970,7 +5972,7 @@ where msg: announcement, // Note that announcement_signatures fails if the channel cannot be announced, // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: self.get_channel_update_for_broadcast(channel).unwrap(), + update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()), }); } } @@ -6250,9 +6252,8 @@ where fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_channels = Vec::new(); - let mut no_channels_remain = true; let mut per_peer_state = self.per_peer_state.write().unwrap(); - { + let remove_peer = { log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.", log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" }); if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { @@ -6265,8 +6266,6 @@ where update_maps_on_chan_removal!(self, chan); self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer); return false; - } else { - no_channels_remain = false; } true }); @@ -6286,6 +6285,7 @@ where &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, + &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::SendChannelUpdate { .. } => false, &events::MessageSendEvent::HandleError { .. } => false, &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, @@ -6294,9 +6294,12 @@ where &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, } }); - } - } - if no_channels_remain { + debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect"); + peer_state.is_connected = false; + peer_state.ok_to_remove(true) + } else { true } + }; + if remove_peer { per_peer_state.remove(counterparty_node_id); } mem::drop(per_peer_state); @@ -6324,10 +6327,14 @@ where channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + is_connected: true, })); }, hash_map::Entry::Occupied(e) => { - e.get().lock().unwrap().latest_features = init_msg.features.clone(); + let mut peer_state = e.get().lock().unwrap(); + peer_state.latest_features = init_msg.features.clone(); + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; }, } } @@ -6378,7 +6385,7 @@ where let channel_ids: Vec<[u8; 32]> = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { return; } + if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; peer_state.channel_by_id.keys().cloned().collect() @@ -6392,7 +6399,7 @@ where // First check if we can advance the channel type and try again. let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { return; } + if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; if let Some(chan) = peer_state.channel_by_id.get_mut(&msg.channel_id) { @@ -6886,6 +6893,7 @@ where best_block.block_hash().write(writer)?; } + let mut serializable_peer_count: u64 = 0; { let per_peer_state = self.per_peer_state.read().unwrap(); let mut unfunded_channels = 0; @@ -6893,6 +6901,9 @@ where for (_, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + if !peer_state.ok_to_remove(false) { + serializable_peer_count += 1; + } number_of_channels += peer_state.channel_by_id.len(); for (_, channel) in peer_state.channel_by_id.iter() { if !channel.is_funding_initiated() { @@ -6943,11 +6954,18 @@ where htlc_purposes.push(purpose); } - (per_peer_state.len() as u64).write(writer)?; + (serializable_peer_count).write(writer)?; for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() { - peer_pubkey.write(writer)?; - let peer_state = peer_state_mutex.lock().unwrap(); - peer_state.latest_features.write(writer)?; + let peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &*peer_state_lock; + // Peers which we have no channels to should be dropped once disconnected. As we + // disconnect all peers when shutting down and serializing the ChannelManager, we + // consider all peers as disconnected here. There's therefore no need write peers with + // no channels. + if !peer_state.ok_to_remove(false) { + peer_pubkey.write(writer)?; + peer_state.latest_features.write(writer)?; + } } let events = self.pending_events.lock().unwrap(); @@ -7346,6 +7364,7 @@ where channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), latest_features: Readable::read(reader)?, pending_msg_events: Vec::new(), + is_connected: false, }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -7733,19 +7752,14 @@ where mod tests { use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; - use bitcoin::hashes::hex::FromHex; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; - use bitcoin::secp256k1::ecdsa::Signature; - use bitcoin::secp256k1::ffi::Signature as FFISignature; - use bitcoin::blockdata::script::Script; - use bitcoin::Txid; use core::time::Duration; use core::sync::atomic::Ordering; use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, InterceptId}; use crate::ln::functional_test_utils::*; use crate::ln::msgs; - use crate::ln::msgs::{ChannelMessageHandler, OptionalField}; + use crate::ln::msgs::ChannelMessageHandler; use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; use crate::util::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; @@ -7962,7 +7976,7 @@ mod tests { let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); create_announced_chan_between_nodes(&nodes, 0, 1); - let scorer = test_utils::TestScorer::with_penalty(0); + let scorer = test_utils::TestScorer::new(); let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); // To start (1), send a regular payment but don't claim it. @@ -8059,8 +8073,6 @@ mod tests { let payer_pubkey = nodes[0].node.get_our_node_id(); let payee_pubkey = nodes[1].node.get_our_node_id(); - nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap(); - nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap(); let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]); let route_params = RouteParameters { @@ -8070,7 +8082,7 @@ mod tests { }; let network_graph = nodes[0].network_graph.clone(); let first_hops = nodes[0].node.list_usable_channels(); - let scorer = test_utils::TestScorer::with_penalty(0); + let scorer = test_utils::TestScorer::new(); let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), @@ -8104,8 +8116,6 @@ mod tests { let payer_pubkey = nodes[0].node.get_our_node_id(); let payee_pubkey = nodes[1].node.get_our_node_id(); - nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap(); - nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap(); let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]); let route_params = RouteParameters { @@ -8115,7 +8125,7 @@ mod tests { }; let network_graph = nodes[0].network_graph.clone(); let first_hops = nodes[0].node.list_usable_channels(); - let scorer = test_utils::TestScorer::with_penalty(0); + let scorer = test_utils::TestScorer::new(); let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), @@ -8170,6 +8180,40 @@ mod tests { } } + #[test] + fn test_drop_disconnected_peers_when_removing_channels() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let chan = create_announced_chan_between_nodes(&nodes, 0, 1); + + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + + nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap(); + check_closed_broadcast!(nodes[0], true); + check_added_monitors!(nodes[0], 1); + check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed); + + { + // Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been + // disconnected and the channel between has been force closed. + let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap(); + // Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed. + assert_eq!(nodes_0_per_peer_state.len(), 1); + assert!(nodes_0_per_peer_state.get(&nodes[1].node.get_our_node_id()).is_some()); + } + + nodes[0].node.timer_tick_occurred(); + + { + // Assert that nodes[1] has now been removed. + assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 0); + } + } + #[test] fn bad_inbound_payment_hash() { // Add coverage for checking that a user-provided payment hash matches the payment secret. @@ -8314,19 +8358,22 @@ mod tests { fn check_not_connected_to_peer_error(res_err: Result, expected_public_key: PublicKey) { let expected_message = format!("Not connected to node: {}", expected_public_key); - check_api_misuse_error_message(expected_message, res_err) + check_api_error_message(expected_message, res_err) } fn check_unkown_peer_error(res_err: Result, expected_public_key: PublicKey) { let expected_message = format!("Can't find a peer matching the passed counterparty node_id {}", expected_public_key); - check_api_misuse_error_message(expected_message, res_err) + check_api_error_message(expected_message, res_err) } - fn check_api_misuse_error_message(expected_err_message: String, res_err: Result) { + fn check_api_error_message(expected_err_message: String, res_err: Result) { match res_err { Err(APIError::APIMisuseError { err }) => { assert_eq!(err, expected_err_message); }, + Err(APIError::ChannelUnavailable { err }) => { + assert_eq!(err, expected_err_message); + }, Ok(_) => panic!("Unexpected Ok"), Err(_) => panic!("Unexpected Error"), } @@ -8334,140 +8381,23 @@ mod tests { #[test] fn test_api_calls_with_unkown_counterparty_node() { - // Tests that our API functions and message handlers that expects a `counterparty_node_id` - // as input, behaves as expected if the `counterparty_node_id` is an unkown peer in the + // Tests that our API functions that expects a `counterparty_node_id` as input, behaves as + // expected if the `counterparty_node_id` is an unkown peer in the // `ChannelManager::per_peer_state` map. let chanmon_cfg = create_chanmon_cfgs(2); let node_cfg = create_node_cfgs(2, &chanmon_cfg); let node_chanmgr = create_node_chanmgrs(2, &node_cfg, &[None, None]); let nodes = create_network(2, &node_cfg, &node_chanmgr); - // Boilerplate code to produce `open_channel` and `accept_channel` msgs more densly than - // creating dummy ones. - nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 1_000_000, 500_000_000, 42, None).unwrap(); - let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); - nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); - let accept_channel_msg = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); - // Dummy values let channel_id = [4; 32]; - let signature = Signature::from(unsafe { FFISignature::new() }); let unkown_public_key = PublicKey::from_secret_key(&Secp256k1::signing_only(), &SecretKey::from_slice(&[42; 32]).unwrap()); let intercept_id = InterceptId([0; 32]); - // Dummy msgs - let funding_created_msg = msgs::FundingCreated { - temporary_channel_id: open_channel_msg.temporary_channel_id, - funding_txid: Txid::from_hex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap(), - funding_output_index: 0, - signature: signature, - }; - - let funding_signed_msg = msgs::FundingSigned { - channel_id: channel_id, - signature: signature, - }; - - let channel_ready_msg = msgs::ChannelReady { - channel_id: channel_id, - next_per_commitment_point: unkown_public_key, - short_channel_id_alias: None, - }; - - let announcement_signatures_msg = msgs::AnnouncementSignatures { - channel_id: channel_id, - short_channel_id: 0, - node_signature: signature, - bitcoin_signature: signature, - }; - - let channel_reestablish_msg = msgs::ChannelReestablish { - channel_id: channel_id, - next_local_commitment_number: 0, - next_remote_commitment_number: 0, - data_loss_protect: OptionalField::Absent, - }; - - let closing_signed_msg = msgs::ClosingSigned { - channel_id: channel_id, - fee_satoshis: 1000, - signature: signature, - fee_range: None, - }; - - let shutdown_msg = msgs::Shutdown { - channel_id: channel_id, - scriptpubkey: Script::new(), - }; - - let onion_routing_packet = msgs::OnionPacket { - version: 255, - public_key: Ok(unkown_public_key), - hop_data: [1; 20*65], - hmac: [2; 32] - }; - - let update_add_htlc_msg = msgs::UpdateAddHTLC { - channel_id: channel_id, - htlc_id: 0, - amount_msat: 1000000, - payment_hash: PaymentHash([1; 32]), - cltv_expiry: 821716, - onion_routing_packet - }; - - let commitment_signed_msg = msgs::CommitmentSigned { - channel_id: channel_id, - signature: signature, - htlc_signatures: Vec::new(), - }; - - let update_fee_msg = msgs::UpdateFee { - channel_id: channel_id, - feerate_per_kw: 1000, - }; - - let malformed_update_msg = msgs::UpdateFailMalformedHTLC{ - channel_id: channel_id, - htlc_id: 0, - sha256_of_onion: [1; 32], - failure_code: 0x8000, - }; - - let fulfill_update_msg = msgs::UpdateFulfillHTLC{ - channel_id: channel_id, - htlc_id: 0, - payment_preimage: PaymentPreimage([1; 32]), - }; - - let fail_update_msg = msgs::UpdateFailHTLC{ - channel_id: channel_id, - htlc_id: 0, - reason: msgs::OnionErrorPacket { data: Vec::new()}, - }; - - let revoke_and_ack_msg = msgs::RevokeAndACK { - channel_id: channel_id, - per_commitment_secret: [1; 32], - next_per_commitment_point: unkown_public_key, - }; - - // Test the API functions and message handlers. + // Test the API functions. check_not_connected_to_peer_error(nodes[0].node.create_channel(unkown_public_key, 1_000_000, 500_000_000, 42, None), unkown_public_key); - nodes[1].node.handle_open_channel(&unkown_public_key, &open_channel_msg); - - nodes[0].node.handle_accept_channel(&unkown_public_key, &accept_channel_msg); - - check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&open_channel_msg.temporary_channel_id, &unkown_public_key, 42), unkown_public_key); - - nodes[1].node.handle_funding_created(&unkown_public_key, &funding_created_msg); - - nodes[0].node.handle_funding_signed(&unkown_public_key, &funding_signed_msg); - - nodes[0].node.handle_channel_ready(&unkown_public_key, &channel_ready_msg); - - nodes[1].node.handle_announcement_signatures(&unkown_public_key, &announcement_signatures_msg); + check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&channel_id, &unkown_public_key, 42), unkown_public_key); check_unkown_peer_error(nodes[0].node.close_channel(&channel_id, &unkown_public_key), unkown_public_key); @@ -8478,26 +8408,6 @@ mod tests { check_unkown_peer_error(nodes[0].node.forward_intercepted_htlc(intercept_id, &channel_id, unkown_public_key, 1_000_000), unkown_public_key); check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key); - - nodes[0].node.handle_shutdown(&unkown_public_key, &shutdown_msg); - - nodes[1].node.handle_closing_signed(&unkown_public_key, &closing_signed_msg); - - nodes[0].node.handle_channel_reestablish(&unkown_public_key, &channel_reestablish_msg); - - nodes[1].node.handle_update_add_htlc(&unkown_public_key, &update_add_htlc_msg); - - nodes[1].node.handle_commitment_signed(&unkown_public_key, &commitment_signed_msg); - - nodes[1].node.handle_update_fail_malformed_htlc(&unkown_public_key, &malformed_update_msg); - - nodes[1].node.handle_update_fail_htlc(&unkown_public_key, &fail_update_msg); - - nodes[1].node.handle_update_fulfill_htlc(&unkown_public_key, &fulfill_update_msg); - - nodes[1].node.handle_revoke_and_ack(&unkown_public_key, &revoke_and_ack_msg); - - nodes[1].node.handle_update_fee(&unkown_public_key, &update_fee_msg); } #[cfg(anchors)] @@ -8585,7 +8495,8 @@ pub mod bench { let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}; let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; let logger_a = test_utils::TestLogger::with_id("node a".to_owned()); - let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(genesis_hash, &logger_a))); + let scorer = Mutex::new(test_utils::TestScorer::new()); + let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(genesis_hash, &logger_a)), &scorer); let mut config: UserConfig = Default::default(); config.channel_handshake_config.minimum_depth = 1; @@ -8676,7 +8587,7 @@ pub mod bench { let usable_channels = $node_a.list_usable_channels(); let payment_params = PaymentParameters::from_node_id($node_b.get_our_node_id(), TEST_FINAL_CLTV) .with_features($node_b.invoice_features()); - let scorer = test_utils::TestScorer::with_penalty(0); + let scorer = test_utils::TestScorer::new(); let seed = [3u8; 32]; let keys_manager = KeysManager::new(&seed, 42, 42); let random_seed_bytes = keys_manager.get_secure_random_bytes();