X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=108bc7063d3e25f24fca2b736bf2311d41d6988a;hb=cb952f651ffbe79337f857940723ae5b2fcbc408;hp=ef492c0ce6296b7e838fa1e7099096239c912506;hpb=278ebd208a2920a4fbb30f0f4a5e2feb641d7cca;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index ef492c0c..108bc706 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -46,14 +46,18 @@ use crate::ln::channel::{Channel, ChannelError, ChannelUpdateStatus, UpdateFulfi use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures}; #[cfg(any(feature = "_test_utils", test))] use crate::ln::features::InvoiceFeatures; -use crate::routing::router::{InFlightHtlcs, PaymentParameters, Route, RouteHop, RoutePath, RouteParameters}; +use crate::routing::gossip::NetworkGraph; +use crate::routing::router::{DefaultRouter, InFlightHtlcs, PaymentParameters, Route, RouteHop, RoutePath, Router}; +use crate::routing::scoring::ProbabilisticScorer; use crate::ln::msgs; use crate::ln::onion_utils; use crate::ln::onion_utils::HTLCFailReason; use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VALUE_MSAT}; -use crate::ln::outbound_payment::PendingOutboundPayment; +#[cfg(test)] +use crate::ln::outbound_payment; +use crate::ln::outbound_payment::{OutboundPayments, PendingOutboundPayment}; use crate::ln::wire::Encode; -use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient}; +use crate::chain::keysinterface::{EntropySource, KeysInterface, KeysManager, NodeSigner, Recipient, Sign, SignerProvider}; use crate::util::config::{UserConfig, ChannelConfig}; use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use crate::util::events; @@ -289,10 +293,10 @@ struct ReceiveError { type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash, PublicKey, [u8; 32])>); -/// Error type returned across the channel_state mutex boundary. When an Err is generated for a +/// Error type returned across the peer_state mutex boundary. When an Err is generated for a /// Channel, we generally end up with a ChannelError::Close for which we have to close the channel /// immediately (ie with no further calls on it made). Thus, this step happens inside a -/// channel_state lock. We then return the set of things that need to be done outside the lock in +/// peer_state lock. We then return the set of things that need to be done outside the lock in /// this struct and call handle_error!() on it. struct MsgHandleErrInternal { @@ -431,11 +435,7 @@ struct ClaimablePayments { } // Note this is only exposed in cfg(test): -pub(super) struct ChannelHolder { - pub(super) by_id: HashMap<[u8; 32], Channel>, - /// Messages to send to peers - pushed to in the same lock that they are generated in (except - /// for broadcast messages, where ordering isn't as strict). - pub(super) pending_msg_events: Vec, +pub(super) struct ChannelHolder { } /// Events which we process internally but cannot be procsesed immediately at the generation site @@ -457,10 +457,19 @@ pub(crate) enum MonitorUpdateCompletionAction { EmitEvent { event: events::Event }, } -/// State we hold per-peer. In the future we should put channels in here, but for now we only hold -/// the latest Init features we heard from the peer. -struct PeerState { +/// State we hold per-peer. +pub(super) struct PeerState { + /// `temporary_channel_id` or `channel_id` -> `channel`. + /// + /// Holds all channels where the peer is the counterparty. Once a channel has been assigned a + /// `channel_id`, the `temporary_channel_id` key in the map is updated and is replaced by the + /// `channel_id`. + pub(super) channel_by_id: HashMap<[u8; 32], Channel>, + /// The latest `InitFeatures` we heard from the peer. latest_features: InitFeatures, + /// Messages to send to the peer - pushed to in the same lock that they are generated in (except + /// for broadcast messages, where ordering isn't as strict). + pub(super) pending_msg_events: Vec, } /// Stores a PaymentSecret and any other data we may need to validate an inbound payment is @@ -488,24 +497,35 @@ struct PendingInboundPayment { /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static /// lifetimes). Other times you can afford a reference, which is more efficient, in which case /// SimpleRefChannelManager is the more appropriate type. Defining these type aliases prevents -/// issues such as overly long function definitions. Note that the ChannelManager can take any -/// type that implements KeysInterface for its keys manager, but this type alias chooses the -/// concrete type of the KeysManager. +/// issues such as overly long function definitions. Note that the ChannelManager can take any type +/// that implements KeysInterface or Router for its keys manager and router, respectively, but this +/// type alias chooses the concrete types of KeysManager and DefaultRouter. /// /// (C-not exported) as Arcs don't make sense in bindings -pub type SimpleArcChannelManager = ChannelManager, Arc, Arc, Arc, Arc>; +pub type SimpleArcChannelManager = ChannelManager< + Arc, + Arc, + Arc, + Arc, + Arc>>, + Arc, + Arc>>, Arc>>> + >>, + Arc +>; /// SimpleRefChannelManager is a type alias for a ChannelManager reference, and is the reference /// counterpart to the SimpleArcChannelManager type alias. Use this type by default when you don't /// need a ChannelManager with a static lifetime. You'll need a static lifetime in cases such as /// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes). /// But if this is not necessary, using a reference is more efficient. Defining these type aliases -/// helps with issues such as long function definitions. Note that the ChannelManager can take any -/// type that implements KeysInterface for its keys manager, but this type alias chooses the -/// concrete type of the KeysManager. +/// issues such as overly long function definitions. Note that the ChannelManager can take any type +/// that implements KeysInterface or Router for its keys manager and router, respectively, but this +/// type alias chooses the concrete types of KeysManager and DefaultRouter. /// /// (C-not exported) as Arcs don't make sense in bindings -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'d F, &'e L>; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>>, &'g L>; /// Manager which keeps track of a number of channels and sends messages to the appropriate /// channel, also tracking HTLC preimages and forwarding onion packets appropriately. @@ -563,36 +583,42 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage // | | // | |__`claimable_payments` // | | -// | |__`pending_outbound_payments` +// | |__`pending_outbound_payments` // This field's struct contains a map of pending outbounds // | | // | |__`channel_state` // | | -// | |__`id_to_peer` -// | | -// | |__`short_to_chan_info` -// | | // | |__`per_peer_state` // | | -// | |__`outbound_scid_aliases` -// | | -// | |__`best_block` -// | | -// | |__`pending_events` +// | |__`peer_state` +// | | +// | |__`id_to_peer` +// | | +// | |__`short_to_chan_info` // | | -// | |__`pending_background_events` +// | |__`outbound_scid_aliases` +// | | +// | |__`best_block` +// | | +// | |__`pending_events` +// | | +// | |__`pending_background_events` // -pub struct ChannelManager - where M::Target: chain::Watch<::Signer>, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +pub struct ChannelManager +where + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { default_configuration: UserConfig, genesis_hash: BlockHash, fee_estimator: LowerBoundedFeeEstimator, chain_monitor: M, tx_broadcaster: T, + #[allow(unused)] + router: R, /// See `ChannelManager` struct-level documentation for lock order requirements. #[cfg(test)] @@ -603,9 +629,9 @@ pub struct ChannelManager /// See `ChannelManager` struct-level documentation for lock order requirements. #[cfg(any(test, feature = "_test_utils"))] - pub(super) channel_state: Mutex::Signer>>, + pub(super) channel_state: Mutex, #[cfg(not(any(test, feature = "_test_utils")))] - channel_state: Mutex::Signer>>, + channel_state: Mutex, /// Storage for PaymentSecrets and any requirements on future inbound payments before we will /// expose them to users via a PaymentClaimable event. HTLCs which do not meet the requirements @@ -627,7 +653,7 @@ pub struct ChannelManager /// See `PendingOutboundPayment` documentation for more info. /// /// See `ChannelManager` struct-level documentation for lock order requirements. - pending_outbound_payments: Mutex>, + pending_outbound_payments: OutboundPayments, /// SCID/SCID Alias -> forward infos. Key of 0 means payments received. /// @@ -673,6 +699,9 @@ pub struct ChannelManager /// the corresponding channel for the event, as we only have access to the `channel_id` during /// the handling of the events. /// + /// Note that no consistency guarantees are made about the existence of a peer with the + /// `counterparty_node_id` in our other maps. + /// /// TODO: /// The `counterparty_node_id` isn't passed with `MonitorEvent`s currently. To pass it, we need /// to make `counterparty_node_id`'s a required field in `ChannelMonitor`s, which unfortunately @@ -722,15 +751,24 @@ pub struct ChannelManager /// very far in the past, and can only ever be up to two hours in the future. highest_seen_timestamp: AtomicUsize, - /// The bulk of our storage will eventually be here (channels and message queues and the like). + /// The bulk of our storage will eventually be here (message queues and the like). Currently + /// the `per_peer_state` stores our channels on a per-peer basis, as well as the peer's latest + /// features. + /// /// If we are connected to a peer we always at least have an entry here, even if no channels /// are currently open with that peer. + /// /// Because adding or removing an entry is rare, we usually take an outer read lock and then - /// operate on the inner value freely. Sadly, this prevents parallel operation when opening a - /// new channel. + /// operate on the inner value freely. This opens up for parallel per-peer operation for + /// channels. + /// + /// Note that the same thread must never acquire two inner `PeerState` locks at the same time. /// /// See `ChannelManager` struct-level documentation for lock order requirements. - per_peer_state: RwLock>>, + #[cfg(not(any(test, feature = "_test_utils")))] + per_peer_state: FairRwLock::Signer>>>>, + #[cfg(any(test, feature = "_test_utils"))] + pub(super) per_peer_state: FairRwLock::Signer>>>>, /// See `ChannelManager` struct-level documentation for lock order requirements. pending_events: Mutex>, @@ -873,7 +911,7 @@ pub(crate) const MPP_TIMEOUT_TICKS: u8 = 3; /// The number of ticks of [`ChannelManager::timer_tick_occurred`] until we time-out the /// idempotency of payments by [`PaymentId`]. See -/// [`ChannelManager::remove_stale_resolved_payments`]. +/// [`OutboundPayments::remove_stale_resolved_payments`]. pub(crate) const IDEMPOTENCY_TIMEOUT_TICKS: u8 = 7; /// Information needed for constructing an invoice route hint for this channel. @@ -1127,6 +1165,7 @@ macro_rules! handle_error { // entering the macro. assert!($self.channel_state.try_lock().is_ok()); assert!($self.pending_events.try_lock().is_ok()); + assert!($self.per_peer_state.try_write().is_ok()); } let mut msg_events = Vec::with_capacity(2); @@ -1156,7 +1195,31 @@ macro_rules! handle_error { } if !msg_events.is_empty() { - $self.channel_state.lock().unwrap().pending_msg_events.append(&mut msg_events); + let per_peer_state = $self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) { + let mut peer_state = peer_state_mutex.lock().unwrap(); + peer_state.pending_msg_events.append(&mut msg_events); + } + #[cfg(debug_assertions)] + { + if let None = per_peer_state.get(&$counterparty_node_id) { + // This shouldn't occour in tests unless an unkown counterparty_node_id + // has been passed to our message handling functions. + let expected_error_str = format!("Can't find a peer matching the passed counterparty node_id {}", $counterparty_node_id); + match err.action { + msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { ref channel_id, ref data } + } + => { + assert_eq!(*data, expected_error_str); + if let Some((err_channel_id, _user_channel_id)) = chan_id { + assert_eq!(*channel_id, err_channel_id); + } + } + _ => panic!("Unexpected event"), + } + } + } } // Return error in case higher-API need one @@ -1356,12 +1419,14 @@ macro_rules! emit_channel_ready_event { } } -impl ChannelManager - where M::Target: chain::Watch<::Signer>, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +impl ChannelManager +where + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { /// Constructs a new ChannelManager to hold several channels and route between them. /// @@ -1373,7 +1438,7 @@ impl ChannelManager Self { + pub fn new(fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, keys_manager: K, config: UserConfig, params: ChainParameters) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes()); let inbound_pmt_key_material = keys_manager.get_inbound_payment_key_material(); @@ -1384,16 +1449,15 @@ impl ChannelManager ChannelManager ChannelManager { - let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); - let peer_state = peer_state.lock().unwrap(); - let their_features = &peer_state.latest_features; - let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration }; - match Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, - their_features, channel_value_satoshis, push_msat, user_channel_id, config, - self.best_block.read().unwrap().height(), outbound_scid_alias) - { - Ok(res) => res, - Err(e) => { - self.outbound_scid_aliases.lock().unwrap().remove(&outbound_scid_alias); - return Err(e); - }, - } + let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); + let their_features = &peer_state.latest_features; + let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration }; + match Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, + their_features, channel_value_satoshis, push_msat, user_channel_id, config, + self.best_block.read().unwrap().height(), outbound_scid_alias) + { + Ok(res) => res, + Err(e) => { + self.outbound_scid_aliases.lock().unwrap().remove(&outbound_scid_alias); + return Err(e); }, - None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", their_network_key) }), } }; let res = channel.get_open_channel(self.genesis_hash.clone()); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - // We want to make sure the lock is actually acquired by PersistenceNotifierGuard. - debug_assert!(&self.total_consistency_lock.try_write().is_err()); - let temporary_channel_id = channel.channel_id(); - let mut channel_state = self.channel_state.lock().unwrap(); - match channel_state.by_id.entry(temporary_channel_id) { + match peer_state.channel_by_id.entry(temporary_channel_id) { hash_map::Entry::Occupied(_) => { if cfg!(fuzzing) { return Err(APIError::APIMisuseError { err: "Fuzzy bad RNG".to_owned() }); @@ -1518,70 +1583,75 @@ impl ChannelManager { entry.insert(channel); } } - channel_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { + + peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: their_network_key, msg: res, }); Ok(temporary_channel_id) } - fn list_channels_with_filter::Signer>)) -> bool>(&self, f: Fn) -> Vec { + fn list_channels_with_filter::Signer>)) -> bool + Copy>(&self, f: Fn) -> Vec { let mut res = Vec::new(); + // Allocate our best estimate of the number of channels we have in the `res` + // Vec. Sadly the `short_to_chan_info` map doesn't cover channels without + // a scid or a scid alias, and the `id_to_peer` shouldn't be used outside + // of the ChannelMonitor handling. Therefore reallocations may still occur, but is + // unlikely as the `short_to_chan_info` map often contains 2 entries for + // the same channel. + res.reserve(self.short_to_chan_info.read().unwrap().len()); { - let channel_state = self.channel_state.lock().unwrap(); let best_block_height = self.best_block.read().unwrap().height(); - res.reserve(channel_state.by_id.len()); - for (channel_id, channel) in channel_state.by_id.iter().filter(f) { - let balance = channel.get_available_balances(); - let (to_remote_reserve_satoshis, to_self_reserve_satoshis) = - channel.get_holder_counterparty_selected_channel_reserve_satoshis(); - res.push(ChannelDetails { - channel_id: (*channel_id).clone(), - counterparty: ChannelCounterparty { - node_id: channel.get_counterparty_node_id(), - features: InitFeatures::empty(), - unspendable_punishment_reserve: to_remote_reserve_satoshis, - forwarding_info: channel.counterparty_forwarding_info(), - // Ensures that we have actually received the `htlc_minimum_msat` value - // from the counterparty through the `OpenChannel` or `AcceptChannel` - // message (as they are always the first message from the counterparty). - // Else `Channel::get_counterparty_htlc_minimum_msat` could return the - // default `0` value set by `Channel::new_outbound`. - outbound_htlc_minimum_msat: if channel.have_received_message() { - Some(channel.get_counterparty_htlc_minimum_msat()) } else { None }, - outbound_htlc_maximum_msat: channel.get_counterparty_htlc_maximum_msat(), - }, - funding_txo: channel.get_funding_txo(), - // Note that accept_channel (or open_channel) is always the first message, so - // `have_received_message` indicates that type negotiation has completed. - channel_type: if channel.have_received_message() { Some(channel.get_channel_type().clone()) } else { None }, - short_channel_id: channel.get_short_channel_id(), - outbound_scid_alias: if channel.is_usable() { Some(channel.outbound_scid_alias()) } else { None }, - inbound_scid_alias: channel.latest_inbound_scid_alias(), - channel_value_satoshis: channel.get_value_satoshis(), - unspendable_punishment_reserve: to_self_reserve_satoshis, - balance_msat: balance.balance_msat, - inbound_capacity_msat: balance.inbound_capacity_msat, - outbound_capacity_msat: balance.outbound_capacity_msat, - next_outbound_htlc_limit_msat: balance.next_outbound_htlc_limit_msat, - user_channel_id: channel.get_user_id(), - confirmations_required: channel.minimum_depth(), - confirmations: Some(channel.get_funding_tx_confirmations(best_block_height)), - force_close_spend_delay: channel.get_counterparty_selected_contest_delay(), - is_outbound: channel.is_outbound(), - is_channel_ready: channel.is_usable(), - is_usable: channel.is_live(), - is_public: channel.should_announce(), - inbound_htlc_minimum_msat: Some(channel.get_holder_htlc_minimum_msat()), - inbound_htlc_maximum_msat: channel.get_holder_htlc_maximum_msat(), - config: Some(channel.config()), - }); - } - } - let per_peer_state = self.per_peer_state.read().unwrap(); - for chan in res.iter_mut() { - if let Some(peer_state) = per_peer_state.get(&chan.counterparty.node_id) { - chan.counterparty.features = peer_state.lock().unwrap().latest_features.clone(); + let per_peer_state = self.per_peer_state.read().unwrap(); + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (channel_id, channel) in peer_state.channel_by_id.iter().filter(f) { + let balance = channel.get_available_balances(); + let (to_remote_reserve_satoshis, to_self_reserve_satoshis) = + channel.get_holder_counterparty_selected_channel_reserve_satoshis(); + res.push(ChannelDetails { + channel_id: (*channel_id).clone(), + counterparty: ChannelCounterparty { + node_id: channel.get_counterparty_node_id(), + features: peer_state.latest_features.clone(), + unspendable_punishment_reserve: to_remote_reserve_satoshis, + forwarding_info: channel.counterparty_forwarding_info(), + // Ensures that we have actually received the `htlc_minimum_msat` value + // from the counterparty through the `OpenChannel` or `AcceptChannel` + // message (as they are always the first message from the counterparty). + // Else `Channel::get_counterparty_htlc_minimum_msat` could return the + // default `0` value set by `Channel::new_outbound`. + outbound_htlc_minimum_msat: if channel.have_received_message() { + Some(channel.get_counterparty_htlc_minimum_msat()) } else { None }, + outbound_htlc_maximum_msat: channel.get_counterparty_htlc_maximum_msat(), + }, + funding_txo: channel.get_funding_txo(), + // Note that accept_channel (or open_channel) is always the first message, so + // `have_received_message` indicates that type negotiation has completed. + channel_type: if channel.have_received_message() { Some(channel.get_channel_type().clone()) } else { None }, + short_channel_id: channel.get_short_channel_id(), + outbound_scid_alias: if channel.is_usable() { Some(channel.outbound_scid_alias()) } else { None }, + inbound_scid_alias: channel.latest_inbound_scid_alias(), + channel_value_satoshis: channel.get_value_satoshis(), + unspendable_punishment_reserve: to_self_reserve_satoshis, + balance_msat: balance.balance_msat, + inbound_capacity_msat: balance.inbound_capacity_msat, + outbound_capacity_msat: balance.outbound_capacity_msat, + next_outbound_htlc_limit_msat: balance.next_outbound_htlc_limit_msat, + user_channel_id: channel.get_user_id(), + confirmations_required: channel.minimum_depth(), + confirmations: Some(channel.get_funding_tx_confirmations(best_block_height)), + force_close_spend_delay: channel.get_counterparty_selected_contest_delay(), + is_outbound: channel.is_outbound(), + is_channel_ready: channel.is_usable(), + is_usable: channel.is_live(), + is_public: channel.should_announce(), + inbound_htlc_minimum_msat: Some(channel.get_holder_htlc_minimum_msat()), + inbound_htlc_maximum_msat: channel.get_holder_htlc_maximum_msat(), + config: Some(channel.config()), + }); + } } } res @@ -1609,7 +1679,7 @@ impl ChannelManager::Signer>, closure_reason: ClosureReason) { + fn issue_channel_close_events(&self, channel: &Channel<::Signer>, closure_reason: ClosureReason) { let mut pending_events_lock = self.pending_events.lock().unwrap(); match channel.unbroadcasted_funding() { Some(transaction) => { @@ -1631,22 +1701,18 @@ impl ChannelManager = loop { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - match channel_state.by_id.entry(channel_id.clone()) { + let per_peer_state = self.per_peer_state.read().unwrap(); + + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); + } + + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - if *counterparty_node_id != chan_entry.get().get_counterparty_node_id(){ - return Err(APIError::APIMisuseError { err: "The passed counterparty_node_id doesn't match the channel's counterparty node_id".to_owned() }); - } - let (shutdown_msg, monitor_update, htlcs) = { - let per_peer_state = self.per_peer_state.read().unwrap(); - match per_peer_state.get(&counterparty_node_id) { - Some(peer_state) => { - let peer_state = peer_state.lock().unwrap(); - let their_features = &peer_state.latest_features; - chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features, target_feerate_sats_per_1000_weight)? - }, - None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }), - } - }; + let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.keys_manager, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?; failed_htlcs = htlcs; // Update the monitor with the shutdown script if necessary. @@ -1660,7 +1726,7 @@ impl ChannelManager ChannelManager ChannelManager return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()}) + hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), counterparty_node_id) }) } }; @@ -1757,13 +1823,15 @@ impl ChannelManager, broadcast: bool) -> Result { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(peer_node_id); let mut chan = { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; - if let hash_map::Entry::Occupied(chan) = channel_state.by_id.entry(channel_id.clone()) { - if chan.get().get_counterparty_node_id() != *peer_node_id { - return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()}); - } + if let None = peer_state_mutex_opt { + return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) }); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if let hash_map::Entry::Occupied(chan) = peer_state.channel_by_id.entry(channel_id.clone()) { if let Some(peer_msg) = peer_msg { self.issue_channel_close_events(chan.get(),ClosureReason::CounterpartyForceClosed { peer_msg: peer_msg.to_string() }); } else { @@ -1771,14 +1839,14 @@ impl ChannelManager ChannelManager { - self.channel_state.lock().unwrap().pending_msg_events.push( - events::MessageSendEvent::HandleError { - node_id: counterparty_node_id, - action: msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { channel_id: *channel_id, data: "Channel force-closed".to_owned() } - }, - } - ); + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state = peer_state_mutex.lock().unwrap(); + peer_state.pending_msg_events.push( + events::MessageSendEvent::HandleError { + node_id: counterparty_node_id, + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: *channel_id, data: "Channel force-closed".to_owned() } + }, + } + ); + } Ok(()) }, Err(e) => Err(e) @@ -2039,8 +2111,7 @@ impl ChannelManager { // unknown_next_peer // Note that this is likely a timing oracle for detecting whether an scid is a // phantom or an intercept. @@ -2053,13 +2124,20 @@ impl ChannelManager Some(chan_id.clone()), + Some((cp_id, id)) => Some((cp_id.clone(), id.clone())), }; - let chan_update_opt = if let Some(forwarding_id) = forwarding_id_opt { - let chan = match channel_state.by_id.get_mut(&forwarding_id) { + let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if let None = peer_state_mutex_opt { + break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let chan = match peer_state.channel_by_id.get_mut(&forwarding_id) { None => { - // Channel was removed. The short_to_chan_info and by_id maps have - // no consistency guarantees. + // Channel was removed. The short_to_chan_info and channel_by_id maps + // have no consistency guarantees. break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); }, Some(chan) => chan @@ -2165,8 +2243,8 @@ impl ChannelManager::Signer>) -> Result { + /// May be called with peer_state already locked! + fn get_channel_update_for_broadcast(&self, chan: &Channel<::Signer>) -> Result { if !chan.should_announce() { return Err(LightningError { err: "Cannot broadcast a channel_update for a private channel".to_owned(), @@ -2184,8 +2262,8 @@ impl ChannelManager::Signer>) -> Result { + /// May be called with peer_state already locked! + fn get_channel_update_for_unicast(&self, chan: &Channel<::Signer>) -> Result { log_trace!(self.logger, "Attempting to generate channel update for channel {}", log_bytes!(chan.channel_id())); let short_channel_id = match chan.get_short_channel_id().or(chan.latest_inbound_scid_alias()) { None => return Err(LightningError{err: "Channel not yet established".to_owned(), action: msgs::ErrorAction::IgnoreError}), @@ -2194,7 +2272,7 @@ impl ChannelManager::Signer>) -> Result { + fn get_channel_update_for_onion(&self, short_channel_id: u64, chan: &Channel<::Signer>) -> Result { log_trace!(self.logger, "Generating channel update for channel {}", log_bytes!(chan.channel_id())); let were_node_one = PublicKey::from_secret_key(&self.secp_ctx, &self.our_network_key).serialize()[..] < chan.get_counterparty_node_id().serialize()[..]; @@ -2237,18 +2315,22 @@ impl ChannelManager = loop { - let id = match self.short_to_chan_info.read().unwrap().get(&path.first().unwrap().short_channel_id) { + let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.first().unwrap().short_channel_id) { None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}), - Some((_cp_id, chan_id)) => chan_id.clone(), + Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), }; let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; - if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(APIError::InvalidRoute{err: "No peer matching the path's first hop found!" }); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) { match { - if chan.get().get_counterparty_node_id() != path.first().unwrap().pubkey { - return Err(APIError::InvalidRoute{err: "Node ID mismatch on first hop!"}); - } if !chan.get().is_live() { return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!".to_owned()}); } @@ -2285,7 +2367,7 @@ impl ChannelManager ChannelManager ChannelManager, payment_id: PaymentId) -> Result<(), PaymentSendFailure> { - let onion_session_privs = self.add_new_pending_payment(payment_hash, *payment_secret, payment_id, route)?; - self.send_payment_internal(route, payment_hash, payment_secret, None, payment_id, None, onion_session_privs) + let best_block_height = self.best_block.read().unwrap().height(); + self.pending_outbound_payments + .send_payment_with_route(route, payment_hash, payment_secret, payment_id, &self.keys_manager, best_block_height, + |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| + self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) } #[cfg(test)] - pub(crate) fn test_add_new_pending_payment(&self, payment_hash: PaymentHash, payment_secret: Option, payment_id: PaymentId, route: &Route) -> Result, PaymentSendFailure> { - self.add_new_pending_payment(payment_hash, payment_secret, payment_id, route) + fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option, keysend_preimage: Option, payment_id: PaymentId, recv_value_msat: Option, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> { + let best_block_height = self.best_block.read().unwrap().height(); + self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, payment_secret, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.keys_manager, best_block_height, + |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| + self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) } - fn add_new_pending_payment(&self, payment_hash: PaymentHash, payment_secret: Option, payment_id: PaymentId, route: &Route) -> Result, PaymentSendFailure> { - let mut onion_session_privs = Vec::with_capacity(route.paths.len()); - for _ in 0..route.paths.len() { - onion_session_privs.push(self.keys_manager.get_secure_random_bytes()); - } - - let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap(); - match pending_outbounds.entry(payment_id) { - hash_map::Entry::Occupied(_) => Err(PaymentSendFailure::DuplicatePayment), - hash_map::Entry::Vacant(entry) => { - let payment = entry.insert(PendingOutboundPayment::Retryable { - session_privs: HashSet::new(), - pending_amt_msat: 0, - pending_fee_msat: Some(0), - payment_hash, - payment_secret, - starting_block_height: self.best_block.read().unwrap().height(), - total_msat: route.get_total_amount(), - }); - - for (path, session_priv_bytes) in route.paths.iter().zip(onion_session_privs.iter()) { - assert!(payment.insert(*session_priv_bytes, path)); - } - - Ok(onion_session_privs) - }, - } + #[cfg(test)] + pub(crate) fn test_add_new_pending_payment(&self, payment_hash: PaymentHash, payment_secret: Option, payment_id: PaymentId, route: &Route) -> Result, PaymentSendFailure> { + let best_block_height = self.best_block.read().unwrap().height(); + self.pending_outbound_payments.test_add_new_pending_payment(payment_hash, payment_secret, payment_id, route, &self.keys_manager, best_block_height) } - fn send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option, keysend_preimage: Option, payment_id: PaymentId, recv_value_msat: Option, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> { - if route.paths.len() < 1 { - return Err(PaymentSendFailure::ParameterError(APIError::InvalidRoute{err: "There must be at least one path to send over"})); - } - if payment_secret.is_none() && route.paths.len() > 1 { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError{err: "Payment secret is required for multi-path payments".to_string()})); - } - let mut total_value = 0; - let our_node_id = self.get_our_node_id(); - let mut path_errs = Vec::with_capacity(route.paths.len()); - 'path_check: for path in route.paths.iter() { - if path.len() < 1 || path.len() > 20 { - path_errs.push(Err(APIError::InvalidRoute{err: "Path didn't go anywhere/had bogus size"})); - continue 'path_check; - } - for (idx, hop) in path.iter().enumerate() { - if idx != path.len() - 1 && hop.pubkey == our_node_id { - path_errs.push(Err(APIError::InvalidRoute{err: "Path went through us but wasn't a simple rebalance loop to us"})); - continue 'path_check; - } - } - total_value += path.last().unwrap().fee_msat; - path_errs.push(Ok(())); - } - if path_errs.iter().any(|e| e.is_err()) { - return Err(PaymentSendFailure::PathParameterError(path_errs)); - } - if let Some(amt_msat) = recv_value_msat { - debug_assert!(amt_msat >= total_value); - total_value = amt_msat; - } - - let cur_height = self.best_block.read().unwrap().height() + 1; - let mut results = Vec::new(); - debug_assert_eq!(route.paths.len(), onion_session_privs.len()); - for (path, session_priv) in route.paths.iter().zip(onion_session_privs.into_iter()) { - let mut path_res = self.send_payment_along_path(&path, &route.payment_params, &payment_hash, payment_secret, total_value, cur_height, payment_id, &keysend_preimage, session_priv); - match path_res { - Ok(_) => {}, - Err(APIError::MonitorUpdateInProgress) => { - // While a MonitorUpdateInProgress is an Err(_), the payment is still - // considered "in flight" and we shouldn't remove it from the - // PendingOutboundPayment set. - }, - Err(_) => { - let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap(); - if let Some(payment) = pending_outbounds.get_mut(&payment_id) { - let removed = payment.remove(&session_priv, Some(path)); - debug_assert!(removed, "This can't happen as the payment has an entry for this path added by callers"); - } else { - debug_assert!(false, "This can't happen as the payment was added by callers"); - path_res = Err(APIError::APIMisuseError { err: "Internal error: payment disappeared during processing. Please report this bug!".to_owned() }); - } - } - } - results.push(path_res); - } - let mut has_ok = false; - let mut has_err = false; - let mut pending_amt_unsent = 0; - let mut max_unsent_cltv_delta = 0; - for (res, path) in results.iter().zip(route.paths.iter()) { - if res.is_ok() { has_ok = true; } - if res.is_err() { has_err = true; } - if let &Err(APIError::MonitorUpdateInProgress) = res { - // MonitorUpdateInProgress is inherently unsafe to retry, so we call it a - // PartialFailure. - has_err = true; - has_ok = true; - } else if res.is_err() { - pending_amt_unsent += path.last().unwrap().fee_msat; - max_unsent_cltv_delta = cmp::max(max_unsent_cltv_delta, path.last().unwrap().cltv_expiry_delta); - } - } - if has_err && has_ok { - Err(PaymentSendFailure::PartialFailure { - results, - payment_id, - failed_paths_retry: if pending_amt_unsent != 0 { - if let Some(payment_params) = &route.payment_params { - Some(RouteParameters { - payment_params: payment_params.clone(), - final_value_msat: pending_amt_unsent, - final_cltv_expiry_delta: max_unsent_cltv_delta, - }) - } else { None } - } else { None }, - }) - } else if has_err { - // If we failed to send any paths, we should remove the new PaymentId from the - // `pending_outbound_payments` map, as the user isn't expected to `abandon_payment`. - let removed = self.pending_outbound_payments.lock().unwrap().remove(&payment_id).is_some(); - debug_assert!(removed, "We should always have a pending payment to remove here"); - Err(PaymentSendFailure::AllFailedResendSafe(results.drain(..).map(|r| r.unwrap_err()).collect())) - } else { - Ok(()) - } - } /// Retries a payment along the given [`Route`]. /// @@ -2514,64 +2482,10 @@ impl ChannelManager Result<(), PaymentSendFailure> { - const RETRY_OVERFLOW_PERCENTAGE: u64 = 10; - for path in route.paths.iter() { - if path.len() == 0 { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: "length-0 path in route".to_string() - })) - } - } - - let mut onion_session_privs = Vec::with_capacity(route.paths.len()); - for _ in 0..route.paths.len() { - onion_session_privs.push(self.keys_manager.get_secure_random_bytes()); - } - - let (total_msat, payment_hash, payment_secret) = { - let mut outbounds = self.pending_outbound_payments.lock().unwrap(); - match outbounds.get_mut(&payment_id) { - Some(payment) => { - let res = match payment { - PendingOutboundPayment::Retryable { - total_msat, payment_hash, payment_secret, pending_amt_msat, .. - } => { - let retry_amt_msat: u64 = route.paths.iter().map(|path| path.last().unwrap().fee_msat).sum(); - if retry_amt_msat + *pending_amt_msat > *total_msat * (100 + RETRY_OVERFLOW_PERCENTAGE) / 100 { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: format!("retry_amt_msat of {} will put pending_amt_msat (currently: {}) more than 10% over total_payment_amt_msat of {}", retry_amt_msat, pending_amt_msat, total_msat).to_string() - })) - } - (*total_msat, *payment_hash, *payment_secret) - }, - PendingOutboundPayment::Legacy { .. } => { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string() - })) - }, - PendingOutboundPayment::Fulfilled { .. } => { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: "Payment already completed".to_owned() - })); - }, - PendingOutboundPayment::Abandoned { .. } => { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: "Payment already abandoned (with some HTLCs still pending)".to_owned() - })); - }, - }; - for (path, session_priv_bytes) in route.paths.iter().zip(onion_session_privs.iter()) { - assert!(payment.insert(*session_priv_bytes, path)); - } - res - }, - None => - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: format!("Payment with ID {} not found", log_bytes!(payment_id.0)), - })), - } - }; - self.send_payment_internal(route, payment_hash, &payment_secret, None, payment_id, Some(total_msat), onion_session_privs) + let best_block_height = self.best_block.read().unwrap().height(); + self.pending_outbound_payments.retry_payment_with_route(route, payment_id, &self.keys_manager, best_block_height, + |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| + self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) } /// Signals that no further retries for the given payment will occur. @@ -2596,18 +2510,8 @@ impl ChannelManager ChannelManager, payment_id: PaymentId) -> Result { - let preimage = match payment_preimage { - Some(p) => p, - None => PaymentPreimage(self.keys_manager.get_secure_random_bytes()), - }; - let payment_hash = PaymentHash(Sha256::hash(&preimage.0).into_inner()); - let onion_session_privs = self.add_new_pending_payment(payment_hash, None, payment_id, &route)?; - - match self.send_payment_internal(route, payment_hash, &None, Some(preimage), payment_id, None, onion_session_privs) { - Ok(()) => Ok(payment_hash), - Err(e) => Err(e) - } + let best_block_height = self.best_block.read().unwrap().height(); + self.pending_outbound_payments.send_spontaneous_payment(route, payment_preimage, payment_id, &self.keys_manager, best_block_height, + |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| + self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) } /// Send a payment that is probing the given route for liquidity. We calculate the /// [`PaymentHash`] of probes based on a static secret and a random [`PaymentId`], which allows /// us to easily discern them from real payments. pub fn send_probe(&self, hops: Vec) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { - let payment_id = PaymentId(self.keys_manager.get_secure_random_bytes()); - - let payment_hash = self.probing_cookie_from_id(&payment_id); - - if hops.len() < 2 { - return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { - err: "No need probing a path with less than two hops".to_string() - })) - } - - let route = Route { paths: vec![hops], payment_params: None }; - let onion_session_privs = self.add_new_pending_payment(payment_hash, None, payment_id, &route)?; - - match self.send_payment_internal(&route, payment_hash, &None, None, payment_id, None, onion_session_privs) { - Ok(()) => Ok((payment_hash, payment_id)), - Err(e) => Err(e) - } + let best_block_height = self.best_block.read().unwrap().height(); + self.pending_outbound_payments.send_probe(hops, self.probing_cookie_secret, &self.keys_manager, best_block_height, + |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv| + self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv)) } /// Returns whether a payment with the given [`PaymentHash`] and [`PaymentId`] is, in fact, a /// payment probe. + #[cfg(test)] pub(crate) fn payment_is_probe(&self, payment_hash: &PaymentHash, payment_id: &PaymentId) -> bool { - let target_payment_hash = self.probing_cookie_from_id(payment_id); - target_payment_hash == *payment_hash - } - - /// Returns the 'probing cookie' for the given [`PaymentId`]. - fn probing_cookie_from_id(&self, payment_id: &PaymentId) -> PaymentHash { - let mut preimage = [0u8; 64]; - preimage[..32].copy_from_slice(&self.probing_cookie_secret); - preimage[32..].copy_from_slice(&payment_id.0); - PaymentHash(Sha256::hash(&preimage).into_inner()) + outbound_payment::payment_is_probe(payment_hash, payment_id, self.probing_cookie_secret) } /// Handles the generation of a funding transaction, optionally (for tests) with a function /// which checks the correctness of the funding transaction given the associated channel. - fn funding_transaction_generated_intern::Signer>, &Transaction) -> Result>( - &self, temporary_channel_id: &[u8; 32], _counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput + fn funding_transaction_generated_intern::Signer>, &Transaction) -> Result>( + &self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput ) -> Result<(), APIError> { + let mut channel_state = self.channel_state.lock().unwrap(); + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }) + } + + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; let (chan, msg) = { - let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) { - Some(mut chan) => { - let funding_txo = find_funding_output(&chan, &funding_transaction)?; - - (chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger) - .map_err(|e| if let ChannelError::Close(msg) = e { - MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.get_user_id(), chan.force_shutdown(true), None) - } else { unreachable!(); }) - , chan) - }, - None => { return Err(APIError::ChannelUnavailable { err: "No such channel".to_owned() }) }, + let (res, chan) = { + match peer_state.channel_by_id.remove(temporary_channel_id) { + Some(mut chan) => { + let funding_txo = find_funding_output(&chan, &funding_transaction)?; + + (chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger) + .map_err(|e| if let ChannelError::Close(msg) = e { + MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.get_user_id(), chan.force_shutdown(true), None) + } else { unreachable!(); }) + , chan) + }, + None => { return Err(APIError::ChannelUnavailable { err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*temporary_channel_id), counterparty_node_id) }) }, + } }; match handle_error!(self, res, chan.get_counterparty_node_id()) { Ok(funding_msg) => { @@ -2706,12 +2593,12 @@ impl ChannelManager { panic!("Generated duplicate funding txid?"); }, @@ -2845,34 +2732,34 @@ impl ChannelManager ChannelManager Result<(), APIError> { + pub fn forward_intercepted_htlc(&self, intercept_id: InterceptId, next_hop_channel_id: &[u8; 32], next_node_id: PublicKey, amt_to_forward_msat: u64) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let next_hop_scid = match self.channel_state.lock().unwrap().by_id.get(next_hop_channel_id) { - Some(chan) => { - if !chan.is_usable() { - return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id)) + let next_hop_scid = { + let peer_state_lock = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = peer_state_lock.get(&next_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.get(next_hop_channel_id) { + Some(chan) => { + if !chan.is_usable() { + return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id)) + }) + } + chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias()) + }, + None => return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id) }) } - chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias()) - }, - None => return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not found", log_bytes!(*next_hop_channel_id)) - }) + } else { + return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) }); + } }; let payment = self.pending_intercepted_htlcs.lock().unwrap().remove(&intercept_id) @@ -3084,16 +2980,22 @@ impl ChannelManager chan_id.clone(), + let (counterparty_node_id, forward_chan_id) = match self.short_to_chan_info.read().unwrap().get(&short_chan_id) { + Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), None => { forwarding_channel_not_found!(); continue; } }; - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; - match channel_state.by_id.entry(forward_chan_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if let None = peer_state_mutex_opt { + forwarding_channel_not_found!(); + continue; + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(forward_chan_id) { hash_map::Entry::Vacant(_) => { forwarding_channel_not_found!(); continue; @@ -3409,7 +3311,7 @@ impl ChannelManager::Signer>, new_feerate: u32) -> NotifyOption { + fn update_channel_fee(&self, chan_id: &[u8; 32], chan: &mut Channel<::Signer>, new_feerate: u32) -> NotifyOption { if !chan.is_outbound() { return NotifyOption::SkipPersist; } // If the feerate has decreased by less than half, don't bother if new_feerate <= chan.get_feerate() && new_feerate * 2 > chan.get_feerate() { @@ -3440,55 +3342,20 @@ impl ChannelManager { - if payment_id == ev_payment_id { - no_remaining_entries = false; - break; - } - }, - _ => {}, - } - } - } - if no_remaining_entries { - *timer_ticks_without_htlcs += 1; - *timer_ticks_without_htlcs <= IDEMPOTENCY_TIMEOUT_TICKS - } else { - *timer_ticks_without_htlcs = 0; - true - } - } else { true } - }); - } - /// Performs actions which should happen on startup and roughly once per minute thereafter. /// /// This currently includes: @@ -3513,47 +3380,52 @@ impl ChannelManager chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged), - ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged), - ChannelUpdateStatus::DisabledStaged if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Enabled), - ChannelUpdateStatus::EnabledStaged if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Disabled), - ChannelUpdateStatus::DisabledStaged if !chan.is_live() => { - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - should_persist = NotifyOption::DoPersist; - chan.set_channel_update_status(ChannelUpdateStatus::Disabled); - }, - ChannelUpdateStatus::EnabledStaged if chan.is_live() => { - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - should_persist = NotifyOption::DoPersist; - chan.set_channel_update_status(ChannelUpdateStatus::Enabled); - }, - _ => {}, - } + match chan.channel_update_status() { + ChannelUpdateStatus::Enabled if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged), + ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged), + ChannelUpdateStatus::DisabledStaged if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Enabled), + ChannelUpdateStatus::EnabledStaged if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Disabled), + ChannelUpdateStatus::DisabledStaged if !chan.is_live() => { + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + chan.set_channel_update_status(ChannelUpdateStatus::Disabled); + }, + ChannelUpdateStatus::EnabledStaged if chan.is_live() => { + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + chan.set_channel_update_status(ChannelUpdateStatus::Enabled); + }, + _ => {}, + } - chan.maybe_expire_prev_config(); + chan.maybe_expire_prev_config(); - true - }); + true + }); + } } self.claimable_payments.lock().unwrap().claimable_htlcs.retain(|payment_hash, (_, htlcs)| { @@ -3589,7 +3461,7 @@ impl ChannelManager ChannelManager::Signer>) -> (u16, Vec) { + fn get_htlc_inbound_temp_fail_err_and_data(&self, desired_err_code: u16, chan: &Channel<::Signer>) -> (u16, Vec) { // We can't be sure what SCID was used when relaying inbound towards us, so we have to // guess somewhat. If its a public channel, we figure best to just use the real SCID (as // we're not leaking that we have a channel with the counterparty), otherwise we try to use @@ -3656,7 +3528,7 @@ impl ChannelManager::Signer>) -> (u16, Vec) { + fn get_htlc_temp_fail_err_and_data(&self, desired_err_code: u16, scid: u64, chan: &Channel<::Signer>) -> (u16, Vec) { debug_assert_eq!(desired_err_code & 0x1000, 0x1000); if let Ok(upd) = self.get_channel_update_for_onion(scid, chan) { let mut enc = VecWriter(Vec::with_capacity(upd.serialized_length() + 6)); @@ -3685,13 +3557,19 @@ impl ChannelManager, channel_id: [u8; 32], counterparty_node_id: &PublicKey ) { - let (failure_code, onion_failure_data) = - match self.channel_state.lock().unwrap().by_id.entry(channel_id) { - hash_map::Entry::Occupied(chan_entry) => { - self.get_htlc_inbound_temp_fail_err_and_data(0x1000|7, &chan_entry.get()) - }, - hash_map::Entry::Vacant(_) => (0x4000|10, Vec::new()) - }; + let (failure_code, onion_failure_data) = { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(channel_id) { + hash_map::Entry::Occupied(chan_entry) => { + self.get_htlc_inbound_temp_fail_err_and_data(0x1000|7, &chan_entry.get()) + }, + hash_map::Entry::Vacant(_) => (0x4000|10, Vec::new()) + } + } else { (0x4000|10, Vec::new()) } + }; for (htlc_src, payment_hash) in htlcs_to_fail.drain(..) { let reason = HTLCFailReason::reason(failure_code, onion_failure_data.clone()); @@ -3705,11 +3583,13 @@ impl ChannelManager ChannelManager { - let mut session_priv_bytes = [0; 32]; - session_priv_bytes.copy_from_slice(&session_priv[..]); - let mut outbounds = self.pending_outbound_payments.lock().unwrap(); - let mut all_paths_failed = false; - let mut full_failure_ev = None; - if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(*payment_id) { - if !payment.get_mut().remove(&session_priv_bytes, Some(&path)) { - log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0)); - return; - } - if payment.get().is_fulfilled() { - log_trace!(self.logger, "Received failure of HTLC with payment_hash {} after payment completion", log_bytes!(payment_hash.0)); - return; - } - if payment.get().remaining_parts() == 0 { - all_paths_failed = true; - if payment.get().abandoned() { - full_failure_ev = Some(events::Event::PaymentFailed { - payment_id: *payment_id, - payment_hash: payment.get().payment_hash().expect("PendingOutboundPayments::RetriesExceeded always has a payment hash set"), - }); - payment.remove(); - } - } - } else { - log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0)); - return; - } - let mut retry = if let Some(payment_params_data) = payment_params { - let path_last_hop = path.last().expect("Outbound payments must have had a valid path"); - Some(RouteParameters { - payment_params: payment_params_data.clone(), - final_value_msat: path_last_hop.fee_msat, - final_cltv_expiry_delta: path_last_hop.cltv_expiry_delta, - }) - } else { None }; - log_trace!(self.logger, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0)); - - let path_failure = { -#[cfg(test)] - let (network_update, short_channel_id, payment_retryable, onion_error_code, onion_error_data) = onion_error.decode_onion_failure(&self.secp_ctx, &self.logger, &source); -#[cfg(not(test))] - let (network_update, short_channel_id, payment_retryable, _, _) = onion_error.decode_onion_failure(&self.secp_ctx, &self.logger, &source); - - if self.payment_is_probe(payment_hash, &payment_id) { - if !payment_retryable { - events::Event::ProbeSuccessful { - payment_id: *payment_id, - payment_hash: payment_hash.clone(), - path: path.clone(), - } - } else { - events::Event::ProbeFailed { - payment_id: *payment_id, - payment_hash: payment_hash.clone(), - path: path.clone(), - short_channel_id, - } - } - } else { - // TODO: If we decided to blame ourselves (or one of our channels) in - // process_onion_failure we should close that channel as it implies our - // next-hop is needlessly blaming us! - if let Some(scid) = short_channel_id { - retry.as_mut().map(|r| r.payment_params.previously_failed_channels.push(scid)); - } - events::Event::PaymentPathFailed { - payment_id: Some(*payment_id), - payment_hash: payment_hash.clone(), - payment_failed_permanently: !payment_retryable, - network_update, - all_paths_failed, - path: path.clone(), - short_channel_id, - retry, - #[cfg(test)] - error_code: onion_error_code, - #[cfg(test)] - error_data: onion_error_data - } - } - }; - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(path_failure); - if let Some(ev) = full_failure_ev { pending_events.push(ev); } + self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx, &self.pending_events, &self.logger); }, HTLCSource::PreviousHopData(HTLCPreviousHopData { ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret, ref phantom_shared_secret, ref outpoint }) => { log_trace!(self.logger, "Failing HTLC with payment_hash {} backwards from us with {:?}", log_bytes!(payment_hash.0), onion_error); @@ -3909,16 +3705,26 @@ impl ChannelManager chan_id.clone(), + let (counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) { + Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), None => { valid_mpp = false; break; } }; - if let None = channel_state.as_ref().unwrap().by_id.get(&chan_id) { + if let None = per_peer_state.as_ref().unwrap().get(&counterparty_node_id) { + valid_mpp = false; + break; + } + + let peer_state_mutex = per_peer_state.as_ref().unwrap().get(&counterparty_node_id).unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + + if let None = peer_state.channel_by_id.get(&chan_id) { valid_mpp = false; break; } @@ -3929,6 +3735,7 @@ impl ChannelManager ChannelManager ChannelManager ChannelManager ChannelManager) -> Option>(&self, - mut channel_state_lock: MutexGuard::Signer>>, + mut channel_state_lock: MutexGuard, + per_peer_state_lock: RwLockReadGuard::Signer>>>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc) -> Result<(), (PublicKey, MsgHandleErrInternal)> { //TODO: Delay the claimed_funds relaying just like we do outbound relay! let chan_id = prev_hop.outpoint.to_channel_id(); let channel_state = &mut *channel_state_lock; - if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) { - let counterparty_node_id = chan.get().get_counterparty_node_id(); - match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { - Ok(msgs_monitor_option) => { - if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { + + let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) { + Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()), + None => None + }; + + let (found_channel, mut peer_state_opt) = if counterparty_node_id_opt.is_some() && per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).is_some() { + let peer_mutex = per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).unwrap(); + let peer_state = peer_mutex.lock().unwrap(); + let found_channel = peer_state.channel_by_id.contains_key(&chan_id); + (found_channel, Some(peer_state)) + } else { (false, None) }; + + if found_channel { + let peer_state = &mut *peer_state_opt.as_mut().unwrap(); + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) { + let counterparty_node_id = chan.get().get_counterparty_node_id(); + match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { + Ok(msgs_monitor_option) => { + if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { + match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { + ChannelMonitorUpdateStatus::Completed => {}, + e => { + log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, + "Failed to update channel monitor with preimage {:?}: {:?}", + payment_preimage, e); + let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); + mem::drop(channel_state_lock); + mem::drop(peer_state_opt); + mem::drop(per_peer_state_lock); + self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); + return Err((counterparty_node_id, err)); + } + } + if let Some((msg, commitment_signed)) = msgs { + log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", + log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); + peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: counterparty_node_id, + updates: msgs::CommitmentUpdate { + update_add_htlcs: Vec::new(), + update_fulfill_htlcs: vec![msg], + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + } + }); + } + mem::drop(channel_state_lock); + mem::drop(peer_state_opt); + mem::drop(per_peer_state_lock); + self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); + Ok(()) + } else { + Ok(()) + } + }, + Err((e, monitor_update)) => { match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { ChannelMonitorUpdateStatus::Completed => {}, e => { - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, - "Failed to update channel monitor with preimage {:?}: {:?}", + // TODO: This needs to be handled somehow - if we receive a monitor update + // with a preimage we *must* somehow manage to propagate it to the upstream + // channel, or we must have an ability to receive the same update and try + // again on restart. + log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, + "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", payment_preimage, e); - let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); - mem::drop(channel_state_lock); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - return Err((counterparty_node_id, err)); - } + }, } - if let Some((msg, commitment_signed)) = msgs { - log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", - log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); - channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: chan.get().get_counterparty_node_id(), - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: vec![msg], - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - } - }); + let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); + if drop { + chan.remove_entry(); } mem::drop(channel_state_lock); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - Ok(()) - } else { - Ok(()) - } - }, - Err((e, monitor_update)) => { - match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same update and try - // again on restart. - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, - "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", - payment_preimage, e); - }, - } - let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); - if drop { - chan.remove_entry(); - } - mem::drop(channel_state_lock); - self.handle_monitor_update_completion_actions(completion_action(None)); - Err((counterparty_node_id, res)) - }, + mem::drop(peer_state_opt); + mem::drop(per_peer_state_lock); + self.handle_monitor_update_completion_actions(completion_action(None)); + Err((counterparty_node_id, res)) + }, + } + } else { + // We've held the peer_state mutex since finding the channel and setting + // found_channel to true, so the channel can't have been dropped. + unreachable!() } } else { let preimage_update = ChannelMonitorUpdate { @@ -4079,6 +3917,8 @@ impl ChannelManager ChannelManager) { - let mut outbounds = self.pending_outbound_payments.lock().unwrap(); - let mut pending_events = self.pending_events.lock().unwrap(); - for source in sources.drain(..) { - if let HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } = source { - let mut session_priv_bytes = [0; 32]; - session_priv_bytes.copy_from_slice(&session_priv[..]); - if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { - assert!(payment.get().is_fulfilled()); - if payment.get_mut().remove(&session_priv_bytes, None) { - pending_events.push( - events::Event::PaymentPathSuccessful { - payment_id, - payment_hash: payment.get().payment_hash(), - path, - } - ); - } - } - } - } + fn finalize_claims(&self, sources: Vec) { + self.pending_outbound_payments.finalize_claims(sources, &self.pending_events); } - fn claim_funds_internal(&self, channel_state_lock: MutexGuard::Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_id: [u8; 32]) { + fn claim_funds_internal(&self, channel_state_lock: MutexGuard, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_id: [u8; 32]) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { mem::drop(channel_state_lock); - let mut session_priv_bytes = [0; 32]; - session_priv_bytes.copy_from_slice(&session_priv[..]); - let mut outbounds = self.pending_outbound_payments.lock().unwrap(); - if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { - let mut pending_events = self.pending_events.lock().unwrap(); - if !payment.get().is_fulfilled() { - let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let fee_paid_msat = payment.get().get_pending_fee_msat(); - pending_events.push( - events::Event::PaymentSent { - payment_id: Some(payment_id), - payment_preimage, - payment_hash, - fee_paid_msat, - } - ); - payment.get_mut().mark_fulfilled(); - } - - if from_onchain { - // We currently immediately remove HTLCs which were fulfilled on-chain. - // This could potentially lead to removing a pending payment too early, - // with a reorg of one block causing us to re-add the fulfilled payment on - // restart. - // TODO: We should have a second monitor event that informs us of payments - // irrevocably fulfilled. - if payment.get_mut().remove(&session_priv_bytes, Some(&path)) { - let payment_hash = Some(PaymentHash(Sha256::hash(&payment_preimage.0).into_inner())); - pending_events.push( - events::Event::PaymentPathSuccessful { - payment_id, - payment_hash, - path, - } - ); - } - } - } else { - log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0)); - } + self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger); }, HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; - let res = self.claim_funds_from_hop(channel_state_lock, hop_data, payment_preimage, + let res = self.claim_funds_from_hop(channel_state_lock, self.per_peer_state.read().unwrap(), hop_data, payment_preimage, |htlc_claim_value_msat| { if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat { @@ -4211,7 +3993,7 @@ impl ChannelManager, - channel: &mut Channel<::Signer>, raa: Option, + channel: &mut Channel<::Signer>, raa: Option, commitment_update: Option, order: RAACommitmentOrder, pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option, channel_ready: Option, announcement_sigs: Option) @@ -4271,22 +4053,41 @@ impl ChannelManager) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let htlc_forwards; let (mut pending_failures, finalized_claims, counterparty_node_id) = { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; - let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) { - hash_map::Entry::Occupied(chan) => chan, - hash_map::Entry::Vacant(_) => return, + let counterparty_node_id = match counterparty_node_id { + Some(cp_id) => cp_id.clone(), + None => { + // TODO: Once we can rely on the counterparty_node_id from the + // monitor event, this and the id_to_peer map should be removed. + let id_to_peer = self.id_to_peer.lock().unwrap(); + match id_to_peer.get(&funding_txo.to_channel_id()) { + Some(cp_id) => cp_id.clone(), + None => return, + } + } + }; + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut peer_state_lock; + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if let None = peer_state_mutex_opt { return } + peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let mut channel = { + match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ + hash_map::Entry::Occupied(chan) => chan, + hash_map::Entry::Vacant(_) => return, + } }; if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id { return; } - let counterparty_node_id = channel.get().get_counterparty_node_id(); let updates = channel.get_mut().monitor_updating_restored(&self.logger, self.get_our_node_id(), self.genesis_hash, self.best_block.read().unwrap().height()); let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() { // We only send a channel_update in the case where we are just now sending a @@ -4301,9 +4102,9 @@ impl ChannelManager ChannelManager { if !channel.get().inbound_is_awaiting_accept() { return Err(APIError::APIMisuseError { err: "The channel isn't currently awaiting to be accepted.".to_owned() }); } - if *counterparty_node_id != channel.get().get_counterparty_node_id() { - return Err(APIError::APIMisuseError { err: "The passed counterparty_node_id doesn't match the channel's counterparty node_id".to_owned() }); - } if accept_0conf { channel.get_mut().set_0conf(); } else if channel.get().get_channel_type().requires_zero_conf() { @@ -4382,18 +4187,18 @@ impl ChannelManager { - return Err(APIError::ChannelUnavailable { err: "Can't accept a channel that doesn't exist".to_owned() }); + return Err(APIError::ChannelUnavailable { err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*temporary_channel_id), counterparty_node_id) }); } } Ok(()) @@ -4425,17 +4230,24 @@ impl ChannelManager { self.outbound_scid_aliases.lock().unwrap().remove(&outbound_scid_alias); - return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision!".to_owned(), msg.temporary_channel_id.clone())) + return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision for the same peer!".to_owned(), msg.temporary_channel_id.clone())) }, hash_map::Entry::Vacant(entry) => { if !self.default_configuration.manually_accept_inbound_channels { if channel.get_channel_type().requires_zero_conf() { return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone())); } - channel_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { node_id: counterparty_node_id.clone(), msg: channel.accept_inbound_channel(user_channel_id), }); @@ -4460,17 +4272,19 @@ impl ChannelManager Result<(), MsgHandleErrInternal> { let (value, output_script, user_id) = { - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_lock; - match channel_state.by_id.entry(msg.temporary_channel_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)) + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id)); - } try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &their_features), chan); (chan.get().get_value_satoshis(), chan.get().get_funding_redeemscript().to_v0_p2wsh(), chan.get().get_user_id()) }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) } }; let mut pending_events = self.pending_events.lock().unwrap(); @@ -4485,21 +4299,25 @@ impl ChannelManager Result<(), MsgHandleErrInternal> { + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_state_lock; + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)) + } let ((funding_msg, monitor, mut channel_ready), mut chan) = { let best_block = *self.best_block.read().unwrap(); - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_lock; - match channel_state.by_id.entry(msg.temporary_channel_id.clone()) { + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id)); - } (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.keys_manager, &self.logger), chan), chan.remove()) }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) } }; - // Because we have exclusive ownership of the channel here we can release the channel_state + // Because we have exclusive ownership of the channel here we can release the peer_state // lock before watch_channel match self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor) { ChannelMonitorUpdateStatus::Completed => {}, @@ -4524,9 +4342,12 @@ impl ChannelManager { return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id)) }, @@ -4542,12 +4363,12 @@ impl ChannelManager ChannelManager { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } let (monitor, funding_tx, channel_ready) = match chan.get_mut().funding_signed(&msg, best_block, &self.keys_manager, &self.logger) { Ok(update) => update, Err(e) => try_chan_entry!(self, Err(e), chan), @@ -4585,11 +4411,11 @@ impl ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; log_info!(self.logger, "Broadcasting funding transaction with txid {}", funding_tx.txid()); @@ -4600,16 +4426,20 @@ impl ChannelManager Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - match channel_state.by_id.entry(msg.channel_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } let announcement_sigs_opt = try_chan_entry!(self, chan.get_mut().channel_ready(&msg, self.get_our_node_id(), self.genesis_hash.clone(), &self.best_block.read().unwrap(), &self.logger), chan); if let Some(announcement_sigs) = announcement_sigs_opt { log_trace!(self.logger, "Sending announcement_signatures for channel {}", log_bytes!(chan.get().channel_id())); - channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { node_id: counterparty_node_id.clone(), msg: announcement_sigs, }); @@ -4621,7 +4451,7 @@ impl ChannelManager ChannelManager Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } } @@ -4641,12 +4471,15 @@ impl ChannelManager = loop { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - - match channel_state.by_id.entry(msg.channel_id.clone()) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - if chan_entry.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } if !chan_entry.get().received_shutdown() { log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.", @@ -4669,7 +4502,7 @@ impl ChannelManager ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; for htlc_source in dropped_htlcs.drain(..) { @@ -4691,17 +4524,19 @@ impl ChannelManager Result<(), MsgHandleErrInternal> { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) + } let (tx, chan_option) = { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; - match channel_state.by_id.entry(msg.channel_id.clone()) { + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - if chan_entry.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } let (closing_signed, tx) = try_chan_entry!(self, chan_entry.get_mut().closing_signed(&self.fee_estimator, &msg), chan_entry); if let Some(msg) = closing_signed { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { node_id: counterparty_node_id.clone(), msg, }); @@ -4715,7 +4550,7 @@ impl ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; if let Some(broadcast_tx) = tx { @@ -4724,8 +4559,9 @@ impl ChannelManager ChannelManager { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } - let create_pending_htlc_status = |chan: &Channel<::Signer>, pending_forward_info: PendingHTLCStatus, error_code: u16| { + let create_pending_htlc_status = |chan: &Channel<::Signer>, pending_forward_info: PendingHTLCStatus, error_code: u16| { // If the update_add is completely bogus, the call will Err and we will close, // but if we've sent a shutdown and they haven't acknowledged it yet, we just // want to reject the new HTLC and fail it backwards instead of forwarding. @@ -4778,23 +4615,26 @@ impl ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } Ok(()) } fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> { - let mut channel_lock = self.channel_state.lock().unwrap(); + let channel_lock = self.channel_state.lock().unwrap(); let (htlc_source, forwarded_htlc_value) = { - let channel_state = &mut *channel_lock; - match channel_state.by_id.entry(msg.channel_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } try_chan_entry!(self, chan.get_mut().update_fulfill_htlc(&msg), chan) }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, msg.channel_id); @@ -4802,28 +4642,32 @@ impl ChannelManager Result<(), MsgHandleErrInternal> { - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_lock; - match channel_state.by_id.entry(msg.channel_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } try_chan_entry!(self, chan.get_mut().update_fail_htlc(&msg, HTLCFailReason::from_msg(msg)), chan); }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } Ok(()) } fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_lock; - match channel_state.by_id.entry(msg.channel_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } if (msg.failure_code & 0x8000) == 0 { let chan_err: ChannelError = ChannelError::Close("Got update_fail_malformed_htlc with BADONION not set".to_owned()); try_chan_entry!(self, Err(chan_err), chan); @@ -4831,18 +4675,22 @@ impl ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } } fn internal_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - match channel_state.by_id.entry(msg.channel_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } let (revoke_and_ack, commitment_signed, monitor_update) = match chan.get_mut().commitment_signed(&msg, &self.logger) { Err((None, e)) => try_chan_entry!(self, Err(e), chan), @@ -4859,12 +4707,12 @@ impl ChannelManager ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } } @@ -4979,11 +4827,15 @@ impl ChannelManager { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } let was_paused_for_mon_update = chan.get().is_awaiting_monitor_update(); let raa_updates = break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); @@ -5007,7 +4859,7 @@ impl ChannelManager ChannelManager break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id); @@ -5040,16 +4892,18 @@ impl ChannelManager Result<(), MsgHandleErrInternal> { - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_lock; - match channel_state.by_id.entry(msg.channel_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } try_chan_entry!(self, chan.get_mut().update_fee(&self.fee_estimator, &msg, &self.logger), chan); }, - hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } Ok(()) } @@ -5057,17 +4911,20 @@ impl ChannelManager Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - - match channel_state.by_id.entry(msg.channel_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } if !chan.get().is_usable() { return Err(MsgHandleErrInternal::from_no_close(LightningError{err: "Got an announcement_signatures before we were ready for it".to_owned(), action: msgs::ErrorAction::IgnoreError})); } - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { msg: try_chan_entry!(self, chan.get_mut().announcement_signatures( self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height(), msg), chan), // Note that announcement_signatures fails if the channel cannot be announced, @@ -5075,23 +4932,28 @@ impl ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } Ok(()) } /// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err. fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result { - let chan_id = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) { - Some((_cp_id, chan_id)) => chan_id.clone(), + let (chan_counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) { + Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), None => { // It's not a local channel return Ok(NotifyOption::SkipPersist) } }; - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; - match channel_state.by_id.entry(chan_id) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id); + if let None = peer_state_mutex_opt { + return Ok(NotifyOption::SkipPersist) + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(chan_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_counterparty_node_id() != *counterparty_node_id { if chan.get().should_announce() { @@ -5121,12 +4983,16 @@ impl ChannelManager { - if chan.get().get_counterparty_node_id() != *counterparty_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); - } // Currently, we expect all holding cell update_adds to be dropped on peer // disconnect, so Channel's reestablish will never hand us any holding cell // freed HTLCs to fail backwards. If in the future we no longer drop pending @@ -5136,7 +5002,7 @@ impl ChannelManager ChannelManager return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; @@ -5197,32 +5063,47 @@ impl ChannelManager { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; - let by_id = &mut channel_state.by_id; - let pending_msg_events = &mut channel_state.pending_msg_events; - if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) { - let mut chan = remove_channel!(self, chan_entry); - failed_channels.push(chan.force_shutdown(false)); - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); + let counterparty_node_id_opt = match counterparty_node_id { + Some(cp_id) => Some(cp_id), + None => { + // TODO: Once we can rely on the counterparty_node_id from the + // monitor event, this and the id_to_peer map should be removed. + let id_to_peer = self.id_to_peer.lock().unwrap(); + id_to_peer.get(&funding_outpoint.to_channel_id()).cloned() + } + }; + if let Some(counterparty_node_id) = counterparty_node_id_opt { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + if let hash_map::Entry::Occupied(chan_entry) = peer_state.channel_by_id.entry(funding_outpoint.to_channel_id()) { + let mut chan = remove_channel!(self, chan_entry); + failed_channels.push(chan.force_shutdown(false)); + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event { + ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() } + } else { + ClosureReason::CommitmentTxConfirmed + }; + self.issue_channel_close_events(&chan, reason); + pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: chan.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() } + }, + }); + } } - let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event { - ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() } - } else { - ClosureReason::CommitmentTxConfirmed - }; - self.issue_channel_close_events(&chan, reason); - pending_msg_events.push(events::MessageSendEvent::HandleError { - node_id: chan.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() } - }, - }); } }, MonitorEvent::Completed { funding_txo, monitor_update_id } => { - self.channel_monitor_updated(&funding_txo, monitor_update_id); + self.channel_monitor_updated(&funding_txo, monitor_update_id, counterparty_node_id.as_ref()); }, } } @@ -5253,45 +5134,49 @@ impl ChannelManager { - if !holding_cell_failed_htlcs.is_empty() { - failed_htlcs.push(( - holding_cell_failed_htlcs, - *channel_id, - chan.get_counterparty_node_id() - )); - } - if let Some((commitment_update, monitor_update)) = commitment_opt { - match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) { - ChannelMonitorUpdateStatus::Completed => { - pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: chan.get_counterparty_node_id(), - updates: commitment_update, - }); - }, - e => { - has_monitor_update = true; - let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY); - handle_errors.push((chan.get_counterparty_node_id(), res)); - if close_channel { return false; } - }, + let per_peer_state = self.per_peer_state.read().unwrap(); + + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + peer_state.channel_by_id.retain(|channel_id, chan| { + match chan.maybe_free_holding_cell_htlcs(&self.logger) { + Ok((commitment_opt, holding_cell_failed_htlcs)) => { + if !holding_cell_failed_htlcs.is_empty() { + failed_htlcs.push(( + holding_cell_failed_htlcs, + *channel_id, + chan.get_counterparty_node_id() + )); + } + if let Some((commitment_update, monitor_update)) = commitment_opt { + match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) { + ChannelMonitorUpdateStatus::Completed => { + pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: chan.get_counterparty_node_id(), + updates: commitment_update, + }); + }, + e => { + has_monitor_update = true; + let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY); + handle_errors.push((chan.get_counterparty_node_id(), res)); + if close_channel { return false; } + }, + } } + true + }, + Err(e) => { + let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id); + handle_errors.push((chan.get_counterparty_node_id(), Err(res))); + // ChannelClosed event is generated by handle_error for us + !close_channel } - true - }, - Err(e) => { - let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id); - handle_errors.push((chan.get_counterparty_node_id(), Err(res))); - // ChannelClosed event is generated by handle_error for us - !close_channel } - } - }); + }); + } } let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty(); @@ -5315,43 +5200,47 @@ impl ChannelManager { - if let Some(msg) = msg_opt { - has_update = true; - pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: chan.get_counterparty_node_id(), msg, - }); - } - if let Some(tx) = tx_opt { - // We're done with this channel. We got a closing_signed and sent back - // a closing_signed with a closing transaction to broadcast. - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + peer_state.channel_by_id.retain(|channel_id, chan| { + match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) { + Ok((msg_opt, tx_opt)) => { + if let Some(msg) = msg_opt { + has_update = true; + pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + node_id: chan.get_counterparty_node_id(), msg, }); } + if let Some(tx) = tx_opt { + // We're done with this channel. We got a closing_signed and sent back + // a closing_signed with a closing transaction to broadcast. + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } - self.issue_channel_close_events(chan, ClosureReason::CooperativeClosure); + self.issue_channel_close_events(chan, ClosureReason::CooperativeClosure); - log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); - self.tx_broadcaster.broadcast_transaction(&tx); - update_maps_on_chan_removal!(self, chan); - false - } else { true } - }, - Err(e) => { - has_update = true; - let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id); - handle_errors.push((chan.get_counterparty_node_id(), Err(res))); - !close_channel + log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); + self.tx_broadcaster.broadcast_transaction(&tx); + update_maps_on_chan_removal!(self, chan); + false + } else { true } + }, + Err(e) => { + has_update = true; + let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id); + handle_errors.push((chan.get_counterparty_node_id(), Err(res))); + !close_channel + } } - } - }); + }); + } } for (counterparty_node_id, err) in handle_errors.drain(..) { @@ -5581,10 +5470,15 @@ impl ChannelManager InFlightHtlcs { let mut inflight_htlcs = InFlightHtlcs::new(); - for chan in self.channel_state.lock().unwrap().by_id.values() { - for (htlc_source, _) in chan.inflight_htlc_sources() { - if let HTLCSource::OutboundRoute { path, .. } = htlc_source { - inflight_htlcs.process_path(path, self.get_our_node_id()); + let per_peer_state = self.per_peer_state.read().unwrap(); + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for chan in peer_state.channel_by_id.values() { + for (htlc_source, _) in chan.inflight_htlc_sources() { + if let HTLCSource::OutboundRoute { path, .. } = htlc_source { + inflight_htlcs.process_path(path, self.get_our_node_id()); + } } } } @@ -5608,12 +5502,12 @@ impl ChannelManager bool { - !self.pending_outbound_payments.lock().unwrap().is_empty() + self.pending_outbound_payments.has_pending_payments() } #[cfg(test)] pub fn clear_pending_payments(&self) { - self.pending_outbound_payments.lock().unwrap().clear() + self.pending_outbound_payments.clear_pending_payments() } /// Processes any events asynchronously in the order they were generated since the last call @@ -5650,13 +5544,28 @@ impl ChannelManager MessageSendEventsProvider for ChannelManager - where M::Target: chain::Watch<::Signer>, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +impl MessageSendEventsProvider for ChannelManager +where + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { + /// Returns `MessageSendEvent`s strictly ordered per-peer, in the order they were generated. + /// The returned array will contain `MessageSendEvent`s for different peers if + /// `MessageSendEvent`s to more than one peer exists, but `MessageSendEvent`s to the same peer + /// is always placed next to each other. + /// + /// Note that that while `MessageSendEvent`s are strictly ordered per-peer, the peer order for + /// the chunks of `MessageSendEvent`s for different peers is random. I.e. if the array contains + /// `MessageSendEvent`s for both `node_a` and `node_b`, the `MessageSendEvent`s for `node_a` + /// will randomly be placed first or last in the returned array. + /// + /// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate` + /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among + /// the `MessageSendEvent`s to the specific peer they were generated under. fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { @@ -5676,8 +5585,16 @@ impl MessageSendEventsProvider } let mut pending_events = Vec::new(); - let mut channel_state = self.channel_state.lock().unwrap(); - mem::swap(&mut pending_events, &mut channel_state.pending_msg_events); + let per_peer_state = self.per_peer_state.read().unwrap(); + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if peer_state.pending_msg_events.len() > 0 { + let mut peer_pending_events = Vec::new(); + mem::swap(&mut peer_pending_events, &mut peer_state.pending_msg_events); + pending_events.append(&mut peer_pending_events); + } + } if !pending_events.is_empty() { events.replace(pending_events); @@ -5689,12 +5606,13 @@ impl MessageSendEventsProvider } } -impl EventsProvider for ChannelManager +impl EventsProvider for ChannelManager where - M::Target: chain::Watch<::Signer>, + M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, + R::Target: Router, L::Target: Logger, { /// Processes events that must be periodically handled. @@ -5725,12 +5643,13 @@ where } } -impl chain::Listen for ChannelManager +impl chain::Listen for ChannelManager where - M::Target: chain::Watch<::Signer>, + M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, + R::Target: Router, L::Target: Logger, { fn filtered_block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { @@ -5762,12 +5681,13 @@ where } } -impl chain::Confirm for ChannelManager +impl chain::Confirm for ChannelManager where - M::Target: chain::Watch<::Signer>, + M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, + R::Target: Router, L::Target: Logger, { fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { @@ -5827,11 +5747,14 @@ where } fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { - let channel_state = self.channel_state.lock().unwrap(); - let mut res = Vec::with_capacity(channel_state.by_id.len()); - for chan in channel_state.by_id.values() { - if let (Some(funding_txo), block_hash) = (chan.get_funding_txo(), chan.get_funding_tx_confirmed_in()) { - res.push((funding_txo.txid, block_hash)); + let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len()); + for (_cp_id, peer_state_mutex) in self.per_peer_state.read().unwrap().iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for chan in peer_state.channel_by_id.values() { + if let (Some(funding_txo), block_hash) = (chan.get_funding_txo(), chan.get_funding_tx_confirmed_in()) { + res.push((funding_txo.txid, block_hash)); + } } } res @@ -5849,18 +5772,19 @@ where } } -impl ChannelManager +impl ChannelManager where - M::Target: chain::Watch<::Signer>, + M::Target: chain::Watch<::Signer>, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, + R::Target: Router, L::Target: Logger, { /// Calls a function which handles an on-chain event (blocks dis/connected, transactions /// un/confirmed, etc) on each channel, handling any resulting errors or messages generated by /// the function. - fn do_chain_event::Signer>) -> Result<(Option, Vec<(HTLCSource, PaymentHash)>, Option), ClosureReason>> + fn do_chain_event::Signer>) -> Result<(Option, Vec<(HTLCSource, PaymentHash)>, Option), ClosureReason>> (&self, height_opt: Option, f: FN) { // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called // during initialization prior to the chain_monitor being fully configured in some cases. @@ -5871,87 +5795,92 @@ where { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; - let pending_msg_events = &mut channel_state.pending_msg_events; - channel_state.by_id.retain(|_, channel| { - let res = f(channel); - if let Ok((channel_ready_opt, mut timed_out_pending_htlcs, announcement_sigs)) = res { - for (source, payment_hash) in timed_out_pending_htlcs.drain(..) { - let (failure_code, data) = self.get_htlc_inbound_temp_fail_err_and_data(0x1000|14 /* expiry_too_soon */, &channel); - timed_out_htlcs.push((source, payment_hash, HTLCFailReason::reason(failure_code, data), - HTLCDestination::NextHopChannel { node_id: Some(channel.get_counterparty_node_id()), channel_id: channel.channel_id() })); - } - if let Some(channel_ready) = channel_ready_opt { - send_channel_ready!(self, pending_msg_events, channel, channel_ready); - if channel.is_usable() { - log_trace!(self.logger, "Sending channel_ready with private initial channel_update for our counterparty on channel {}", log_bytes!(channel.channel_id())); - if let Ok(msg) = self.get_channel_update_for_unicast(channel) { - pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { - node_id: channel.get_counterparty_node_id(), - msg, - }); + let per_peer_state = self.per_peer_state.read().unwrap(); + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + peer_state.channel_by_id.retain(|_, channel| { + let res = f(channel); + if let Ok((channel_ready_opt, mut timed_out_pending_htlcs, announcement_sigs)) = res { + for (source, payment_hash) in timed_out_pending_htlcs.drain(..) { + let (failure_code, data) = self.get_htlc_inbound_temp_fail_err_and_data(0x1000|14 /* expiry_too_soon */, &channel); + timed_out_htlcs.push((source, payment_hash, HTLCFailReason::reason(failure_code, data), + HTLCDestination::NextHopChannel { node_id: Some(channel.get_counterparty_node_id()), channel_id: channel.channel_id() })); + } + if let Some(channel_ready) = channel_ready_opt { + send_channel_ready!(self, pending_msg_events, channel, channel_ready); + if channel.is_usable() { + log_trace!(self.logger, "Sending channel_ready with private initial channel_update for our counterparty on channel {}", log_bytes!(channel.channel_id())); + if let Ok(msg) = self.get_channel_update_for_unicast(channel) { + pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { + node_id: channel.get_counterparty_node_id(), + msg, + }); + } + } else { + log_trace!(self.logger, "Sending channel_ready WITHOUT channel_update for {}", log_bytes!(channel.channel_id())); } - } else { - log_trace!(self.logger, "Sending channel_ready WITHOUT channel_update for {}", log_bytes!(channel.channel_id())); } - } - emit_channel_ready_event!(self, channel); + emit_channel_ready_event!(self, channel); - if let Some(announcement_sigs) = announcement_sigs { - log_trace!(self.logger, "Sending announcement_signatures for channel {}", log_bytes!(channel.channel_id())); - pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { - node_id: channel.get_counterparty_node_id(), - msg: announcement_sigs, - }); - if let Some(height) = height_opt { - if let Some(announcement) = channel.get_signed_channel_announcement(self.get_our_node_id(), self.genesis_hash, height) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { - msg: announcement, - // Note that announcement_signatures fails if the channel cannot be announced, - // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: self.get_channel_update_for_broadcast(channel).unwrap(), - }); + if let Some(announcement_sigs) = announcement_sigs { + log_trace!(self.logger, "Sending announcement_signatures for channel {}", log_bytes!(channel.channel_id())); + pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: channel.get_counterparty_node_id(), + msg: announcement_sigs, + }); + if let Some(height) = height_opt { + if let Some(announcement) = channel.get_signed_channel_announcement(self.get_our_node_id(), self.genesis_hash, height) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { + msg: announcement, + // Note that announcement_signatures fails if the channel cannot be announced, + // so get_channel_update_for_broadcast will never fail by the time we get here. + update_msg: self.get_channel_update_for_broadcast(channel).unwrap(), + }); + } } } - } - if channel.is_our_channel_ready() { - if let Some(real_scid) = channel.get_short_channel_id() { - // If we sent a 0conf channel_ready, and now have an SCID, we add it - // to the short_to_chan_info map here. Note that we check whether we - // can relay using the real SCID at relay-time (i.e. - // enforce option_scid_alias then), and if the funding tx is ever - // un-confirmed we force-close the channel, ensuring short_to_chan_info - // is always consistent. - let mut short_to_chan_info = self.short_to_chan_info.write().unwrap(); - let scid_insert = short_to_chan_info.insert(real_scid, (channel.get_counterparty_node_id(), channel.channel_id())); - assert!(scid_insert.is_none() || scid_insert.unwrap() == (channel.get_counterparty_node_id(), channel.channel_id()), - "SCIDs should never collide - ensure you weren't behind by a full {} blocks when creating channels", - fake_scid::MAX_SCID_BLOCKS_FROM_NOW); + if channel.is_our_channel_ready() { + if let Some(real_scid) = channel.get_short_channel_id() { + // If we sent a 0conf channel_ready, and now have an SCID, we add it + // to the short_to_chan_info map here. Note that we check whether we + // can relay using the real SCID at relay-time (i.e. + // enforce option_scid_alias then), and if the funding tx is ever + // un-confirmed we force-close the channel, ensuring short_to_chan_info + // is always consistent. + let mut short_to_chan_info = self.short_to_chan_info.write().unwrap(); + let scid_insert = short_to_chan_info.insert(real_scid, (channel.get_counterparty_node_id(), channel.channel_id())); + assert!(scid_insert.is_none() || scid_insert.unwrap() == (channel.get_counterparty_node_id(), channel.channel_id()), + "SCIDs should never collide - ensure you weren't behind by a full {} blocks when creating channels", + fake_scid::MAX_SCID_BLOCKS_FROM_NOW); + } } - } - } else if let Err(reason) = res { - update_maps_on_chan_removal!(self, channel); - // It looks like our counterparty went on-chain or funding transaction was - // reorged out of the main chain. Close the channel. - failed_channels.push(channel.force_shutdown(true)); - if let Ok(update) = self.get_channel_update_for_broadcast(&channel) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update + } else if let Err(reason) = res { + update_maps_on_chan_removal!(self, channel); + // It looks like our counterparty went on-chain or funding transaction was + // reorged out of the main chain. Close the channel. + failed_channels.push(channel.force_shutdown(true)); + if let Ok(update) = self.get_channel_update_for_broadcast(&channel) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + let reason_message = format!("{}", reason); + self.issue_channel_close_events(channel, reason); + pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: channel.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage { msg: msgs::ErrorMessage { + channel_id: channel.channel_id(), + data: reason_message, + } }, }); + return false; } - let reason_message = format!("{}", reason); - self.issue_channel_close_events(channel, reason); - pending_msg_events.push(events::MessageSendEvent::HandleError { - node_id: channel.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage { msg: msgs::ErrorMessage { - channel_id: channel.channel_id(), - data: reason_message, - } }, - }); - return false; - } - true - }); + true + }); + } } if let Some(height) = height_opt { @@ -6049,13 +5978,15 @@ where } } -impl - ChannelMessageHandler for ChannelManager - where M::Target: chain::Watch<::Signer>, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +impl + ChannelMessageHandler for ChannelManager +where + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { fn handle_open_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); @@ -6151,14 +6082,16 @@ impl let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_channels = Vec::new(); let mut no_channels_remain = true; + let mut channel_state = self.channel_state.lock().unwrap(); + let mut per_peer_state = self.per_peer_state.write().unwrap(); { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; - let pending_msg_events = &mut channel_state.pending_msg_events; log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.", log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" }); - channel_state.by_id.retain(|_, chan| { - if chan.get_counterparty_node_id() == *counterparty_node_id { + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + peer_state.channel_by_id.retain(|_, chan| { chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger); if chan.is_shutdown() { update_maps_on_chan_removal!(self, chan); @@ -6167,37 +6100,39 @@ impl } else { no_channels_remain = false; } - } - true - }); - pending_msg_events.retain(|msg| { - match msg { - &events::MessageSendEvent::SendAcceptChannel { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendOpenChannel { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendFundingCreated { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendFundingSigned { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendChannelReady { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendAnnouncementSignatures { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::UpdateHTLCs { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendRevokeAndACK { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendClosingSigned { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendShutdown { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendChannelAnnouncement { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, - &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, - &events::MessageSendEvent::SendChannelUpdate { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, - &events::MessageSendEvent::SendShortIdsQuery { .. } => false, - &events::MessageSendEvent::SendReplyChannelRange { .. } => false, - &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, - } - }); + true + }); + pending_msg_events.retain(|msg| { + match msg { + &events::MessageSendEvent::SendAcceptChannel { .. } => false, + &events::MessageSendEvent::SendOpenChannel { .. } => false, + &events::MessageSendEvent::SendFundingCreated { .. } => false, + &events::MessageSendEvent::SendFundingSigned { .. } => false, + &events::MessageSendEvent::SendChannelReady { .. } => false, + &events::MessageSendEvent::SendAnnouncementSignatures { .. } => false, + &events::MessageSendEvent::UpdateHTLCs { .. } => false, + &events::MessageSendEvent::SendRevokeAndACK { .. } => false, + &events::MessageSendEvent::SendClosingSigned { .. } => false, + &events::MessageSendEvent::SendShutdown { .. } => false, + &events::MessageSendEvent::SendChannelReestablish { .. } => false, + &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, + &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, + &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, + &events::MessageSendEvent::SendChannelUpdate { .. } => false, + &events::MessageSendEvent::HandleError { .. } => false, + &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, + &events::MessageSendEvent::SendShortIdsQuery { .. } => false, + &events::MessageSendEvent::SendReplyChannelRange { .. } => false, + &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, + } + }); + } + mem::drop(channel_state); } if no_channels_remain { - self.per_peer_state.write().unwrap().remove(counterparty_node_id); + per_peer_state.remove(counterparty_node_id); } + mem::drop(per_peer_state); for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); @@ -6219,7 +6154,9 @@ impl match peer_state_lock.entry(counterparty_node_id.clone()) { hash_map::Entry::Vacant(e) => { e.insert(Mutex::new(PeerState { + channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), + pending_msg_events: Vec::new(), })); }, hash_map::Entry::Occupied(e) => { @@ -6230,35 +6167,41 @@ impl let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - let pending_msg_events = &mut channel_state.pending_msg_events; - channel_state.by_id.retain(|_, chan| { - let retain = if chan.get_counterparty_node_id() == *counterparty_node_id { - if !chan.have_received_message() { - // If we created this (outbound) channel while we were disconnected from the - // peer we probably failed to send the open_channel message, which is now - // lost. We can't have had anything pending related to this channel, so we just - // drop it. - false - } else { - pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { - node_id: chan.get_counterparty_node_id(), - msg: chan.get_channel_reestablish(&self.logger), - }); - true - } - } else { true }; - if retain && chan.get_counterparty_node_id() != *counterparty_node_id { - if let Some(msg) = chan.get_signed_channel_announcement(self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height()) { - if let Ok(update_msg) = self.get_channel_update_for_broadcast(chan) { - pending_msg_events.push(events::MessageSendEvent::SendChannelAnnouncement { - node_id: *counterparty_node_id, - msg, update_msg, + let per_peer_state = self.per_peer_state.read().unwrap(); + + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + peer_state.channel_by_id.retain(|_, chan| { + let retain = if chan.get_counterparty_node_id() == *counterparty_node_id { + if !chan.have_received_message() { + // If we created this (outbound) channel while we were disconnected from the + // peer we probably failed to send the open_channel message, which is now + // lost. We can't have had anything pending related to this channel, so we just + // drop it. + false + } else { + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.get_counterparty_node_id(), + msg: chan.get_channel_reestablish(&self.logger), }); + true + } + } else { true }; + if retain && chan.get_counterparty_node_id() != *counterparty_node_id { + if let Some(msg) = chan.get_signed_channel_announcement(self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height()) { + if let Ok(update_msg) = self.get_channel_update_for_broadcast(chan) { + pending_msg_events.push(events::MessageSendEvent::SendChannelAnnouncement { + node_id: *counterparty_node_id, + msg, update_msg, + }); + } } } - } - retain - }); + retain + }); + } //TODO: Also re-broadcast announcement_signatures Ok(()) } @@ -6267,22 +6210,30 @@ impl let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); if msg.channel_id == [0; 32] { - for chan in self.list_channels() { - if chan.counterparty.node_id == *counterparty_node_id { - // Untrusted messages from peer, we throw away the error if id points to a non-existent channel - let _ = self.force_close_channel_with_peer(&chan.channel_id, counterparty_node_id, Some(&msg.data), true); - } + let channel_ids: Vec<[u8; 32]> = { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { return; } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + peer_state.channel_by_id.keys().cloned().collect() + }; + for channel_id in channel_ids { + // Untrusted messages from peer, we throw away the error if id points to a non-existent channel + let _ = self.force_close_channel_with_peer(&channel_id, counterparty_node_id, Some(&msg.data), true); } } else { { // First check if we can advance the channel type and try again. let mut channel_state = self.channel_state.lock().unwrap(); - if let Some(chan) = channel_state.by_id.get_mut(&msg.channel_id) { - if chan.get_counterparty_node_id() != *counterparty_node_id { - return; - } + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { return; } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if let Some(chan) = peer_state.channel_by_id.get_mut(&msg.channel_id) { if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash) { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: *counterparty_node_id, msg, }); @@ -6736,12 +6687,14 @@ impl_writeable_tlv_based!(PendingInboundPayment, { (8, min_value_msat, required), }); -impl Writeable for ChannelManager - where M::Target: chain::Watch<::Signer>, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +impl Writeable for ChannelManager +where + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { fn write(&self, writer: &mut W) -> Result<(), io::Error> { let _consistency_lock = self.total_consistency_lock.write().unwrap(); @@ -6756,19 +6709,29 @@ impl Writeable for ChannelMana } { - // Take `channel_state` lock temporarily to avoid creating a lock order that requires - // that the `forward_htlcs` lock is taken after `channel_state` - let channel_state = self.channel_state.lock().unwrap(); + let per_peer_state = self.per_peer_state.read().unwrap(); let mut unfunded_channels = 0; - for (_, channel) in channel_state.by_id.iter() { - if !channel.is_funding_initiated() { - unfunded_channels += 1; + let mut number_of_channels = 0; + for (_, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + number_of_channels += peer_state.channel_by_id.len(); + for (_, channel) in peer_state.channel_by_id.iter() { + if !channel.is_funding_initiated() { + unfunded_channels += 1; + } } } - ((channel_state.by_id.len() - unfunded_channels) as u64).write(writer)?; - for (_, channel) in channel_state.by_id.iter() { - if channel.is_funding_initiated() { - channel.write(writer)?; + + ((number_of_channels - unfunded_channels) as u64).write(writer)?; + + for (_, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (_, channel) in peer_state.channel_by_id.iter() { + if channel.is_funding_initiated() { + channel.write(writer)?; + } } } } @@ -6787,7 +6750,7 @@ impl Writeable for ChannelMana let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap(); let claimable_payments = self.claimable_payments.lock().unwrap(); - let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap(); + let pending_outbound_payments = self.pending_outbound_payments.pending_outbound_payments.lock().unwrap(); let mut htlc_purposes: Vec<&events::PaymentPurpose> = Vec::new(); (claimable_payments.claimable_htlcs.len() as u64).write(writer)?; @@ -6933,12 +6896,14 @@ impl Writeable for ChannelMana /// which you've already broadcasted the transaction. /// /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor -pub struct ChannelManagerReadArgs<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - where M::Target: chain::Watch<::Signer>, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +pub struct ChannelManagerReadArgs<'a, M: Deref, T: Deref, K: Deref, F: Deref, R: Deref, L: Deref> +where + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { /// The keys provider which will give us relevant keys. Some keys will be loaded during /// deserialization and KeysInterface::read_chan_signer will be used to read per-Channel @@ -6960,6 +6925,11 @@ pub struct ChannelManagerReadArgs<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: /// used to broadcast the latest local commitment transactions of channels which must be /// force-closed during deserialization. pub tx_broadcaster: T, + /// The router which will be used in the ChannelManager in the future for finding routes + /// on-the-fly for trampoline payments. Absent in private nodes that don't support forwarding. + /// + /// No calls to the router will be made during deserialization. + pub router: R, /// The Logger for use in the ChannelManager and which may be used to log information during /// deserialization. pub logger: L, @@ -6979,24 +6949,26 @@ pub struct ChannelManagerReadArgs<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: /// this struct. /// /// (C-not exported) because we have no HashMap bindings - pub channel_monitors: HashMap::Signer>>, + pub channel_monitors: HashMap::Signer>>, } -impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ChannelManagerReadArgs<'a, M, T, K, F, L> - where M::Target: chain::Watch<::Signer>, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, - { +impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, R: Deref, L: Deref> + ChannelManagerReadArgs<'a, M, T, K, F, R, L> +where + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, +{ /// Simple utility function to create a ChannelManagerReadArgs which creates the monitor /// HashMap for you. This is primarily useful for C bindings where it is not practical to /// populate a HashMap directly from C. - pub fn new(keys_manager: K, fee_estimator: F, chain_monitor: M, tx_broadcaster: T, logger: L, default_config: UserConfig, - mut channel_monitors: Vec<&'a mut ChannelMonitor<::Signer>>) -> Self { + pub fn new(keys_manager: K, fee_estimator: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, default_config: UserConfig, + mut channel_monitors: Vec<&'a mut ChannelMonitor<::Signer>>) -> Self { Self { - keys_manager, fee_estimator, chain_monitor, tx_broadcaster, logger, default_config, + keys_manager, fee_estimator, chain_monitor, tx_broadcaster, router, logger, default_config, channel_monitors: channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) }).collect() } } @@ -7004,29 +6976,33 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> // Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the // SipmleArcChannelManager type: -impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ReadableArgs> for (BlockHash, Arc>) - where M::Target: chain::Watch<::Signer>, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, R: Deref, L: Deref> + ReadableArgs> for (BlockHash, Arc>) +where + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { - fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, M, T, K, F, L>) -> Result { - let (blockhash, chan_manager) = <(BlockHash, ChannelManager)>::read(reader, args)?; + fn read(reader: &mut Reader, args: ChannelManagerReadArgs<'a, M, T, K, F, R, L>) -> Result { + let (blockhash, chan_manager) = <(BlockHash, ChannelManager)>::read(reader, args)?; Ok((blockhash, Arc::new(chan_manager))) } } -impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ReadableArgs> for (BlockHash, ChannelManager) - where M::Target: chain::Watch<::Signer>, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, R: Deref, L: Deref> + ReadableArgs> for (BlockHash, ChannelManager) +where + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { - fn read(reader: &mut R, mut args: ChannelManagerReadArgs<'a, M, T, K, F, L>) -> Result { + fn read(reader: &mut Reader, mut args: ChannelManagerReadArgs<'a, M, T, K, F, R, L>) -> Result { let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); let genesis_hash: BlockHash = Readable::read(reader)?; @@ -7037,12 +7013,12 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> let channel_count: u64 = Readable::read(reader)?; let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128)); - let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); + let mut peer_channels: HashMap::Signer>>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut channel_closures = Vec::new(); for _ in 0..channel_count { - let mut channel: Channel<::Signer> = Channel::read(reader, (&args.keys_manager, best_block_height))?; + let mut channel: Channel<::Signer> = Channel::read(reader, (&args.keys_manager, best_block_height))?; let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { @@ -7103,7 +7079,17 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> if channel.is_funding_initiated() { id_to_peer.insert(channel.channel_id(), channel.get_counterparty_node_id()); } - by_id.insert(channel.channel_id(), channel); + match peer_channels.entry(channel.get_counterparty_node_id()) { + hash_map::Entry::Occupied(mut entry) => { + let by_id_map = entry.get_mut(); + by_id_map.insert(channel.channel_id(), channel); + }, + hash_map::Entry::Vacant(entry) => { + let mut by_id_map = HashMap::new(); + by_id_map.insert(channel.channel_id(), channel); + entry.insert(by_id_map); + } + } } } else if channel.is_awaiting_initial_mon_persist() { // If we were persisted and shut down while the initial ChannelMonitor persistence @@ -7158,11 +7144,13 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> } let peer_count: u64 = Readable::read(reader)?; - let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex)>())); + let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex::Signer>>)>())); for _ in 0..peer_count { let peer_pubkey = Readable::read(reader)?; let peer_state = PeerState { + channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), latest_features: Readable::read(reader)?, + pending_msg_events: Vec::new(), }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -7254,7 +7242,7 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> // We only rebuild the pending payments map if we were most recently serialized by // 0.0.102+ for (_, monitor) in args.channel_monitors.iter() { - if by_id.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() { + if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() { for (htlc_source, htlc) in monitor.get_pending_outbound_htlcs() { if let HTLCSource::OutboundRoute { payment_id, session_priv, path, payment_secret, .. } = htlc_source { if path.is_empty() { @@ -7402,28 +7390,32 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> } let mut outbound_scid_aliases = HashSet::new(); - for (chan_id, chan) in by_id.iter_mut() { - if chan.outbound_scid_alias() == 0 { - let mut outbound_scid_alias; - loop { - outbound_scid_alias = fake_scid::Namespace::OutboundAlias - .get_fake_scid(best_block_height, &genesis_hash, fake_scid_rand_bytes.as_ref().unwrap(), &args.keys_manager); - if outbound_scid_aliases.insert(outbound_scid_alias) { break; } - } - chan.set_outbound_scid_alias(outbound_scid_alias); - } else if !outbound_scid_aliases.insert(chan.outbound_scid_alias()) { - // Note that in rare cases its possible to hit this while reading an older - // channel if we just happened to pick a colliding outbound alias above. - log_error!(args.logger, "Got duplicate outbound SCID alias; {}", chan.outbound_scid_alias()); - return Err(DecodeError::InvalidValue); - } - if chan.is_usable() { - if short_to_chan_info.insert(chan.outbound_scid_alias(), (chan.get_counterparty_node_id(), *chan_id)).is_some() { + for (_peer_node_id, peer_state_mutex) in per_peer_state.iter_mut() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (chan_id, chan) in peer_state.channel_by_id.iter_mut() { + if chan.outbound_scid_alias() == 0 { + let mut outbound_scid_alias; + loop { + outbound_scid_alias = fake_scid::Namespace::OutboundAlias + .get_fake_scid(best_block_height, &genesis_hash, fake_scid_rand_bytes.as_ref().unwrap(), &args.keys_manager); + if outbound_scid_aliases.insert(outbound_scid_alias) { break; } + } + chan.set_outbound_scid_alias(outbound_scid_alias); + } else if !outbound_scid_aliases.insert(chan.outbound_scid_alias()) { // Note that in rare cases its possible to hit this while reading an older // channel if we just happened to pick a colliding outbound alias above. log_error!(args.logger, "Got duplicate outbound SCID alias; {}", chan.outbound_scid_alias()); return Err(DecodeError::InvalidValue); } + if chan.is_usable() { + if short_to_chan_info.insert(chan.outbound_scid_alias(), (chan.get_counterparty_node_id(), *chan_id)).is_some() { + // Note that in rare cases its possible to hit this while reading an older + // channel if we just happened to pick a colliding outbound alias above. + log_error!(args.logger, "Got duplicate outbound SCID alias; {}", chan.outbound_scid_alias()); + return Err(DecodeError::InvalidValue); + } + } } } @@ -7460,8 +7452,13 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> // without the new monitor persisted - we'll end up right back here on // restart. let previous_channel_id = claimable_htlc.prev_hop.outpoint.to_channel_id(); - if let Some(channel) = by_id.get_mut(&previous_channel_id) { - channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &args.logger); + if let Some(peer_node_id) = id_to_peer.get(&previous_channel_id){ + let peer_state_mutex = per_peer_state.get(peer_node_id).unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if let Some(channel) = peer_state.channel_by_id.get_mut(&previous_channel_id) { + channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &args.logger); + } } if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) { previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &args.tx_broadcaster, &bounded_fee_estimator, &args.logger); @@ -7482,16 +7479,15 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> fee_estimator: bounded_fee_estimator, chain_monitor: args.chain_monitor, tx_broadcaster: args.tx_broadcaster, + router: args.router, best_block: RwLock::new(BestBlock::new(best_block_hash, best_block_height)), channel_state: Mutex::new(ChannelHolder { - by_id, - pending_msg_events: Vec::new(), }), inbound_payment_key: expanded_inbound_key, pending_inbound_payments: Mutex::new(pending_inbound_payments), - pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), + pending_outbound_payments: OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()) }, pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()), forward_htlcs: Mutex::new(forward_htlcs), @@ -7509,7 +7505,7 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> highest_seen_timestamp: AtomicUsize::new(highest_seen_timestamp as usize), - per_peer_state: RwLock::new(per_peer_state), + per_peer_state: FairRwLock::new(per_peer_state), pending_events: Mutex::new(pending_events_read), pending_background_events: Mutex::new(pending_background_events_read), @@ -7539,18 +7535,25 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> mod tests { use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; + use bitcoin::hashes::hex::FromHex; + use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; + use bitcoin::secp256k1::ecdsa::Signature; + use bitcoin::secp256k1::ffi::Signature as FFISignature; + use bitcoin::blockdata::script::Script; + use bitcoin::Txid; use core::time::Duration; use core::sync::atomic::Ordering; use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; - use crate::ln::channelmanager::{self, inbound_payment, PaymentId, PaymentSendFailure}; + use crate::ln::channelmanager::{self, inbound_payment, PaymentId, PaymentSendFailure, InterceptId}; use crate::ln::functional_test_utils::*; use crate::ln::msgs; - use crate::ln::msgs::ChannelMessageHandler; + use crate::ln::msgs::{ChannelMessageHandler, OptionalField}; use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; use crate::util::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; use crate::util::test_utils; - use crate::chain::keysinterface::KeysInterface; + use crate::util::config::ChannelConfig; + use crate::chain::keysinterface::{EntropySource, KeysInterface}; #[test] fn test_notify_limits() { @@ -7649,7 +7652,7 @@ mod tests { // Use the utility function send_payment_along_path to send the payment with MPP data which // indicates there are more HTLCs coming. let cur_height = CHAN_CONFIRM_DEPTH + 1; // route_payment calls send_payment, which adds 1 to the current height. So we do the same here to match. - let session_privs = nodes[0].node.add_new_pending_payment(our_payment_hash, Some(payment_secret), payment_id, &mpp_route).unwrap(); + let session_privs = nodes[0].node.test_add_new_pending_payment(our_payment_hash, Some(payment_secret), payment_id, &mpp_route).unwrap(); nodes[0].node.send_payment_along_path(&mpp_route.paths[0], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[0]).unwrap(); check_added_monitors!(nodes[0], 1); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); @@ -7867,7 +7870,7 @@ mod tests { final_value_msat: 10_000, final_cltv_expiry_delta: 40, }; - let network_graph = nodes[0].network_graph; + let network_graph = nodes[0].network_graph.clone(); let first_hops = nodes[0].node.list_usable_channels(); let scorer = test_utils::TestScorer::with_penalty(0); let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); @@ -7878,8 +7881,8 @@ mod tests { let test_preimage = PaymentPreimage([42; 32]); let mismatch_payment_hash = PaymentHash([43; 32]); - let session_privs = nodes[0].node.add_new_pending_payment(mismatch_payment_hash, None, PaymentId(mismatch_payment_hash.0), &route).unwrap(); - nodes[0].node.send_payment_internal(&route, mismatch_payment_hash, &None, Some(test_preimage), PaymentId(mismatch_payment_hash.0), None, session_privs).unwrap(); + let session_privs = nodes[0].node.test_add_new_pending_payment(mismatch_payment_hash, None, PaymentId(mismatch_payment_hash.0), &route).unwrap(); + nodes[0].node.test_send_payment_internal(&route, mismatch_payment_hash, &None, Some(test_preimage), PaymentId(mismatch_payment_hash.0), None, session_privs).unwrap(); check_added_monitors!(nodes[0], 1); let updates = get_htlc_update_msgs!(nodes[0], nodes[1].node.get_our_node_id()); @@ -7912,7 +7915,7 @@ mod tests { final_value_msat: 10_000, final_cltv_expiry_delta: 40, }; - let network_graph = nodes[0].network_graph; + let network_graph = nodes[0].network_graph.clone(); let first_hops = nodes[0].node.list_usable_channels(); let scorer = test_utils::TestScorer::with_penalty(0); let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); @@ -7924,8 +7927,8 @@ mod tests { let test_preimage = PaymentPreimage([42; 32]); let test_secret = PaymentSecret([43; 32]); let payment_hash = PaymentHash(Sha256::hash(&test_preimage.0).into_inner()); - let session_privs = nodes[0].node.add_new_pending_payment(payment_hash, Some(test_secret), PaymentId(payment_hash.0), &route).unwrap(); - nodes[0].node.send_payment_internal(&route, payment_hash, &Some(test_secret), Some(test_preimage), PaymentId(payment_hash.0), None, session_privs).unwrap(); + let session_privs = nodes[0].node.test_add_new_pending_payment(payment_hash, Some(test_secret), PaymentId(payment_hash.0), &route).unwrap(); + nodes[0].node.test_send_payment_internal(&route, payment_hash, &Some(test_secret), Some(test_preimage), PaymentId(payment_hash.0), None, session_privs).unwrap(); check_added_monitors!(nodes[0], 1); let updates = get_htlc_update_msgs!(nodes[0], nodes[1].node.get_our_node_id()); @@ -8110,13 +8113,201 @@ mod tests { check_closed_event!(nodes[0], 1, ClosureReason::CooperativeClosure); check_closed_event!(nodes[1], 1, ClosureReason::CooperativeClosure); } + + fn check_not_connected_to_peer_error(res_err: Result, expected_public_key: PublicKey) { + let expected_message = format!("Not connected to node: {}", expected_public_key); + check_api_misuse_error_message(expected_message, res_err) + } + + fn check_unkown_peer_error(res_err: Result, expected_public_key: PublicKey) { + let expected_message = format!("Can't find a peer matching the passed counterparty node_id {}", expected_public_key); + check_api_misuse_error_message(expected_message, res_err) + } + + fn check_api_misuse_error_message(expected_err_message: String, res_err: Result) { + match res_err { + Err(APIError::APIMisuseError { err }) => { + assert_eq!(err, expected_err_message); + }, + Ok(_) => panic!("Unexpected Ok"), + Err(_) => panic!("Unexpected Error"), + } + } + + #[test] + fn test_api_calls_with_unkown_counterparty_node() { + // Tests that our API functions and message handlers that expects a `counterparty_node_id` + // as input, behaves as expected if the `counterparty_node_id` is an unkown peer in the + // `ChannelManager::per_peer_state` map. + let chanmon_cfg = create_chanmon_cfgs(2); + let node_cfg = create_node_cfgs(2, &chanmon_cfg); + let node_chanmgr = create_node_chanmgrs(2, &node_cfg, &[None, None]); + let nodes = create_network(2, &node_cfg, &node_chanmgr); + + // Boilerplate code to produce `open_channel` and `accept_channel` msgs more densly than + // creating dummy ones. + nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 1_000_000, 500_000_000, 42, None).unwrap(); + let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); + nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), channelmanager::provided_init_features(), &open_channel_msg); + let accept_channel_msg = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); + + // Dummy values + let channel_id = [4; 32]; + let signature = Signature::from(unsafe { FFISignature::new() }); + let unkown_public_key = PublicKey::from_secret_key(&Secp256k1::signing_only(), &SecretKey::from_slice(&[42; 32]).unwrap()); + let intercept_id = InterceptId([0; 32]); + + // Dummy msgs + let funding_created_msg = msgs::FundingCreated { + temporary_channel_id: open_channel_msg.temporary_channel_id, + funding_txid: Txid::from_hex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap(), + funding_output_index: 0, + signature: signature, + }; + + let funding_signed_msg = msgs::FundingSigned { + channel_id: channel_id, + signature: signature, + }; + + let channel_ready_msg = msgs::ChannelReady { + channel_id: channel_id, + next_per_commitment_point: unkown_public_key, + short_channel_id_alias: None, + }; + + let announcement_signatures_msg = msgs::AnnouncementSignatures { + channel_id: channel_id, + short_channel_id: 0, + node_signature: signature, + bitcoin_signature: signature, + }; + + let channel_reestablish_msg = msgs::ChannelReestablish { + channel_id: channel_id, + next_local_commitment_number: 0, + next_remote_commitment_number: 0, + data_loss_protect: OptionalField::Absent, + }; + + let closing_signed_msg = msgs::ClosingSigned { + channel_id: channel_id, + fee_satoshis: 1000, + signature: signature, + fee_range: None, + }; + + let shutdown_msg = msgs::Shutdown { + channel_id: channel_id, + scriptpubkey: Script::new(), + }; + + let onion_routing_packet = msgs::OnionPacket { + version: 255, + public_key: Ok(unkown_public_key), + hop_data: [1; 20*65], + hmac: [2; 32] + }; + + let update_add_htlc_msg = msgs::UpdateAddHTLC { + channel_id: channel_id, + htlc_id: 0, + amount_msat: 1000000, + payment_hash: PaymentHash([1; 32]), + cltv_expiry: 821716, + onion_routing_packet + }; + + let commitment_signed_msg = msgs::CommitmentSigned { + channel_id: channel_id, + signature: signature, + htlc_signatures: Vec::new(), + }; + + let update_fee_msg = msgs::UpdateFee { + channel_id: channel_id, + feerate_per_kw: 1000, + }; + + let malformed_update_msg = msgs::UpdateFailMalformedHTLC{ + channel_id: channel_id, + htlc_id: 0, + sha256_of_onion: [1; 32], + failure_code: 0x8000, + }; + + let fulfill_update_msg = msgs::UpdateFulfillHTLC{ + channel_id: channel_id, + htlc_id: 0, + payment_preimage: PaymentPreimage([1; 32]), + }; + + let fail_update_msg = msgs::UpdateFailHTLC{ + channel_id: channel_id, + htlc_id: 0, + reason: msgs::OnionErrorPacket { data: Vec::new()}, + }; + + let revoke_and_ack_msg = msgs::RevokeAndACK { + channel_id: channel_id, + per_commitment_secret: [1; 32], + next_per_commitment_point: unkown_public_key, + }; + + // Test the API functions and message handlers. + check_not_connected_to_peer_error(nodes[0].node.create_channel(unkown_public_key, 1_000_000, 500_000_000, 42, None), unkown_public_key); + + nodes[1].node.handle_open_channel(&unkown_public_key, channelmanager::provided_init_features(), &open_channel_msg); + + nodes[0].node.handle_accept_channel(&unkown_public_key, channelmanager::provided_init_features(), &accept_channel_msg); + + check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&open_channel_msg.temporary_channel_id, &unkown_public_key, 42), unkown_public_key); + + nodes[1].node.handle_funding_created(&unkown_public_key, &funding_created_msg); + + nodes[0].node.handle_funding_signed(&unkown_public_key, &funding_signed_msg); + + nodes[0].node.handle_channel_ready(&unkown_public_key, &channel_ready_msg); + + nodes[1].node.handle_announcement_signatures(&unkown_public_key, &announcement_signatures_msg); + + check_unkown_peer_error(nodes[0].node.close_channel(&channel_id, &unkown_public_key), unkown_public_key); + + check_unkown_peer_error(nodes[0].node.force_close_broadcasting_latest_txn(&channel_id, &unkown_public_key), unkown_public_key); + + check_unkown_peer_error(nodes[0].node.force_close_without_broadcasting_txn(&channel_id, &unkown_public_key), unkown_public_key); + + check_unkown_peer_error(nodes[0].node.forward_intercepted_htlc(intercept_id, &channel_id, unkown_public_key, 1_000_000), unkown_public_key); + + check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key); + + nodes[0].node.handle_shutdown(&unkown_public_key, &channelmanager::provided_init_features(), &shutdown_msg); + + nodes[1].node.handle_closing_signed(&unkown_public_key, &closing_signed_msg); + + nodes[0].node.handle_channel_reestablish(&unkown_public_key, &channel_reestablish_msg); + + nodes[1].node.handle_update_add_htlc(&unkown_public_key, &update_add_htlc_msg); + + nodes[1].node.handle_commitment_signed(&unkown_public_key, &commitment_signed_msg); + + nodes[1].node.handle_update_fail_malformed_htlc(&unkown_public_key, &malformed_update_msg); + + nodes[1].node.handle_update_fail_htlc(&unkown_public_key, &fail_update_msg); + + nodes[1].node.handle_update_fulfill_htlc(&unkown_public_key, &fulfill_update_msg); + + nodes[1].node.handle_revoke_and_ack(&unkown_public_key, &revoke_and_ack_msg); + + nodes[1].node.handle_update_fee(&unkown_public_key, &update_fee_msg); + } } #[cfg(all(any(test, feature = "_test_utils"), feature = "_bench_unstable"))] pub mod bench { use crate::chain::Listen; use crate::chain::chainmonitor::{ChainMonitor, Persist}; - use crate::chain::keysinterface::{KeysManager, KeysInterface, InMemorySigner}; + use crate::chain::keysinterface::{EntropySource, KeysManager, KeysInterface, InMemorySigner}; use crate::ln::channelmanager::{self, BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentId}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::{ChannelMessageHandler, Init}; @@ -8140,7 +8331,8 @@ pub mod bench { &'a test_utils::TestBroadcaster, &'a test_utils::TestFeeEstimator, &'a test_utils::TestLogger, &'a P>, &'a test_utils::TestBroadcaster, &'a KeysManager, - &'a test_utils::TestFeeEstimator, &'a test_utils::TestLogger>, + &'a test_utils::TestFeeEstimator, &'a test_utils::TestRouter<'a>, + &'a test_utils::TestLogger>, } #[cfg(test)] @@ -8158,15 +8350,16 @@ pub mod bench { let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}; let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; + let logger_a = test_utils::TestLogger::with_id("node a".to_owned()); + let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(genesis_hash, &logger_a))); let mut config: UserConfig = Default::default(); config.channel_handshake_config.minimum_depth = 1; - let logger_a = test_utils::TestLogger::with_id("node a".to_owned()); let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a); let seed_a = [1u8; 32]; let keys_manager_a = KeysManager::new(&seed_a, 42, 42); - let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &logger_a, &keys_manager_a, config.clone(), ChainParameters { + let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &logger_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_genesis(network), }); @@ -8176,7 +8369,7 @@ pub mod bench { let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b); let seed_b = [2u8; 32]; let keys_manager_b = KeysManager::new(&seed_b, 42, 42); - let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &logger_b, &keys_manager_b, config.clone(), ChainParameters { + let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &logger_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_genesis(network), });