use core::{cmp, mem};
use core::cell::RefCell;
use crate::io::Read;
-use crate::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, FairRwLock};
+use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::time::Duration;
use core::ops::Deref;
pending_claiming_payments: HashMap<PaymentHash, ClaimingPayment>,
}
-// Note this is only exposed in cfg(test):
-pub(super) struct ChannelHolder {
-}
-
/// Events which we process internally but cannot be procsesed immediately at the generation site
/// for some reason. They are handled in timer_tick_occurred, so may be processed with
/// quite some time lag.
// | |
// | |__`pending_outbound_payments` // This field's struct contains a map of pending outbounds
// | |
-// | |__`channel_state`
+// | |__`per_peer_state`
// | |
-// | |__`per_peer_state`
+// | |__`peer_state`
// | |
-// | |__`peer_state`
-// | |
-// | |__`id_to_peer`
-// | |
-// | |__`short_to_chan_info`
-// | |
-// | |__`outbound_scid_aliases`
-// | |
-// | |__`best_block`
+// | |__`id_to_peer`
+// | |
+// | |__`short_to_chan_info`
+// | |
+// | |__`outbound_scid_aliases`
+// | |
+// | |__`best_block`
+// | |
+// | |__`pending_events`
// | |
-// | |__`pending_events`
-// | |
-// | |__`pending_background_events`
+// | |__`pending_background_events`
//
pub struct ChannelManager<M: Deref, T: Deref, K: Deref, F: Deref, R: Deref, L: Deref>
where
best_block: RwLock<BestBlock>,
secp_ctx: Secp256k1<secp256k1::All>,
- /// See `ChannelManager` struct-level documentation for lock order requirements.
- #[cfg(any(test, feature = "_test_utils"))]
- pub(super) channel_state: Mutex<ChannelHolder>,
- #[cfg(not(any(test, feature = "_test_utils")))]
- channel_state: Mutex<ChannelHolder>,
-
/// Storage for PaymentSecrets and any requirements on future inbound payments before we will
/// expose them to users via a PaymentClaimable event. HTLCs which do not meet the requirements
/// here are failed when we process them as pending-forwardable-HTLCs, and entries are removed
{
// In testing, ensure there are no deadlocks where the lock is already held upon
// entering the macro.
- assert!($self.channel_state.try_lock().is_ok());
assert!($self.pending_events.try_lock().is_ok());
#[cfg(feature = "std")]
{
best_block: RwLock::new(params.best_block),
- channel_state: Mutex::new(ChannelHolder{
- }),
outbound_scid_aliases: Mutex::new(HashSet::new()),
pending_inbound_payments: Mutex::new(HashMap::new()),
pending_outbound_payments: OutboundPayments::new(),
// We want to make sure the lock is actually acquired by PersistenceNotifierGuard.
debug_assert!(&self.total_consistency_lock.try_write().is_err());
- let mut channel_state = self.channel_state.lock().unwrap();
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(&their_network_key);
let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
let result: Result<(), _> = loop {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()),
};
- let mut channel_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
if let None = peer_state_mutex_opt {
fn funding_transaction_generated_intern<FundingOutput: Fn(&Channel<<K::Target as SignerProvider>::Signer>, &Transaction) -> Result<OutPoint, APIError>>(
&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
) -> Result<(), APIError> {
- let mut channel_state = self.channel_state.lock().unwrap();
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
node_id: chan.get_counterparty_node_id(),
msg,
});
- mem::drop(channel_state);
match peer_state.channel_by_id.entry(chan.channel_id()) {
hash_map::Entry::Occupied(_) => {
panic!("Generated duplicate funding txid?");
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(
&self.total_consistency_lock, &self.persistence_notifier,
);
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
let mut timed_out_mpp_htlcs = Vec::new();
{
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
#[cfg(all(debug_assertions, feature = "std"))]
{
- // Ensure that the `channel_state` and no peer state channel storage lock is not held
- // when calling this function.
+ // Ensure that no peer state channel storage lock is not held when calling this
+ // function.
// This ensures that future code doesn't introduce a lock_order requirement for
- // `forward_htlcs` to be locked after the `channel_state` and `per_peer_state` locks,
- // which calling this function with the locks aquired would.
- assert!(self.channel_state.try_lock().is_ok());
+ // `forward_htlcs` to be locked after the `per_peer_state` locks, which calling this
+ // function with the `per_peer_state` aquired would.
assert!(self.per_peer_state.try_write().is_ok());
}
let mut expected_amt_msat = None;
let mut valid_mpp = true;
let mut errs = Vec::new();
- let mut channel_state = Some(self.channel_state.lock().unwrap());
let mut per_peer_state = Some(self.per_peer_state.read().unwrap());
for htlc in sources.iter() {
let (counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) {
claimable_amt_msat += htlc.value;
}
if sources.is_empty() || expected_amt_msat.is_none() {
- mem::drop(channel_state);
mem::drop(per_peer_state);
self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
log_info!(self.logger, "Attempted to claim an incomplete payment which no longer had any available HTLCs!");
return;
}
if claimable_amt_msat != expected_amt_msat.unwrap() {
- mem::drop(channel_state);
mem::drop(per_peer_state);
self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
log_info!(self.logger, "Attempted to claim an incomplete payment, expected {} msat, had {} available to claim.",
}
if valid_mpp {
for htlc in sources.drain(..) {
- if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); }
if per_peer_state.is_none() { per_peer_state = Some(self.per_peer_state.read().unwrap()); }
- if let Err((pk, err)) = self.claim_funds_from_hop(channel_state.take().unwrap(), per_peer_state.take().unwrap(),
+ if let Err((pk, err)) = self.claim_funds_from_hop(per_peer_state.take().unwrap(),
htlc.prev_hop, payment_preimage,
|_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
{
}
}
}
- mem::drop(channel_state);
mem::drop(per_peer_state);
if !valid_mpp {
for htlc in sources.drain(..) {
}
fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
- mut channel_state_lock: MutexGuard<ChannelHolder>,
per_peer_state_lock: RwLockReadGuard<HashMap<PublicKey, Mutex<PeerState<<K::Target as SignerProvider>::Signer>>>>,
prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
-> Result<(), (PublicKey, MsgHandleErrInternal)> {
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
let chan_id = prev_hop.outpoint.to_channel_id();
- let channel_state = &mut *channel_state_lock;
let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
"Failed to update channel monitor with preimage {:?}: {:?}",
payment_preimage, e);
let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
- mem::drop(channel_state_lock);
mem::drop(peer_state_opt);
mem::drop(per_peer_state_lock);
self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
}
});
}
- mem::drop(channel_state_lock);
mem::drop(peer_state_opt);
mem::drop(per_peer_state_lock);
self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
if drop {
chan.remove_entry();
}
- mem::drop(channel_state_lock);
mem::drop(peer_state_opt);
mem::drop(per_peer_state_lock);
self.handle_monitor_update_completion_actions(completion_action(None));
log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
payment_preimage, update_res);
}
- mem::drop(channel_state_lock);
mem::drop(peer_state_opt);
mem::drop(per_peer_state_lock);
// Note that we do process the completion action here. This totally could be a
self.pending_outbound_payments.finalize_claims(sources, &self.pending_events);
}
- fn claim_funds_internal(&self, channel_state_lock: MutexGuard<ChannelHolder>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
+ fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
match source {
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
- mem::drop(channel_state_lock);
self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger);
},
HTLCSource::PreviousHopData(hop_data) => {
let prev_outpoint = hop_data.outpoint;
- let res = self.claim_funds_from_hop(channel_state_lock, self.per_peer_state.read().unwrap(), hop_data, payment_preimage,
+ let res = self.claim_funds_from_hop(self.per_peer_state.read().unwrap(), hop_data, payment_preimage,
|htlc_claim_value_msat| {
if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
let htlc_forwards;
let (mut pending_failures, finalized_claims, counterparty_node_id) = {
- let mut channel_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_lock;
let counterparty_node_id = match counterparty_node_id {
Some(cp_id) => cp_id.clone(),
None => {
fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u128) -> Result<(), APIError> {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
},
Ok(res) => res
};
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
}
fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
let funding_tx = {
let best_block = *self.best_block.read().unwrap();
- let mut channel_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
}
fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
fn internal_shutdown(&self, counterparty_node_id: &PublicKey, their_features: &InitFeatures, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> {
let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>;
let result: Result<(), _> = loop {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
}
fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> {
- let channel_lock = self.channel_state.lock().unwrap();
let (htlc_source, forwarded_htlc_value) = {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
}
};
- self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, msg.channel_id);
+ self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, msg.channel_id);
Ok(())
}
}
fn internal_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), MsgHandleErrInternal> {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
let mut htlcs_to_fail = Vec::new();
let res = loop {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
}
fn internal_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt {
fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> {
let htlc_forwards;
let need_lnd_workaround = {
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
MonitorEvent::HTLCEvent(htlc_update) => {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
- self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint.to_channel_id());
+ self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint.to_channel_id());
} else {
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() };
},
MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
MonitorEvent::UpdateFailed(funding_outpoint) => {
- let mut channel_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_lock;
let counterparty_node_id_opt = match counterparty_node_id {
Some(cp_id) => Some(cp_id),
None => {
let mut failed_htlcs = Vec::new();
let mut handle_errors = Vec::new();
{
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new();
let mut has_update = false;
{
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
let mut failed_channels = Vec::new();
let mut timed_out_htlcs = Vec::new();
{
- let mut channel_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
let mut failed_channels = Vec::new();
let mut no_channels_remain = true;
- let mut channel_state = self.channel_state.lock().unwrap();
let mut per_peer_state = self.per_peer_state.write().unwrap();
{
log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.",
}
});
}
- mem::drop(channel_state);
}
if no_channels_remain {
per_peer_state.remove(counterparty_node_id);
}
}
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
let per_peer_state = self.per_peer_state.read().unwrap();
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
} else {
{
// First check if we can advance the channel type and try again.
- let mut channel_state = self.channel_state.lock().unwrap();
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
if let None = peer_state_mutex_opt { return; }
best_block: RwLock::new(BestBlock::new(best_block_hash, best_block_height)),
- channel_state: Mutex::new(ChannelHolder {
- }),
inbound_payment_key: expanded_inbound_key,
pending_inbound_payments: Mutex::new(pending_inbound_payments),
pending_outbound_payments: OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()) },