use crate::util::logger::{Level, Logger};
use crate::util::errors::APIError;
-use alloc::collections::BTreeMap;
+use alloc::collections::{btree_map, BTreeMap};
use crate::io;
use crate::prelude::*;
use core::ops::Deref;
// Re-export this for use in the public API.
-pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure, RecipientOnionFields};
+pub use crate::ln::outbound_payment::{PaymentSendFailure, ProbeSendFailure, Retry, RetryableSendFailure, RecipientOnionFields};
use crate::ln::script::ShutdownScript;
// We hold various information about HTLC relay in the HTLC objects in Channel itself:
}
/// Tracks the inbound corresponding to an outbound HTLC
-#[derive(Clone, Hash, PartialEq, Eq)]
+#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub(crate) struct HTLCPreviousHopData {
// Note that this may be an outbound SCID alias for the associated channel.
short_channel_id: u64,
}
}
-/// A payment identifier used to uniquely identify a payment to LDK.
+/// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
+/// a payment and ensure idempotency in LDK.
///
/// This is not exported to bindings users as we just use [u8; 32] directly
#[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
}
}
-#[derive(Clone, Copy, PartialEq, Eq, Hash)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
/// Uniquely describes an HTLC by its source. Just the guaranteed-unique subset of [`HTLCSource`].
pub(crate) enum SentHTLCId {
PreviousHopData { short_channel_id: u64, htlc_id: u64 },
/// Tracks the inbound corresponding to an outbound HTLC
#[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash
-#[derive(Clone, PartialEq, Eq)]
+#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum HTLCSource {
PreviousHopData(HTLCPreviousHopData),
OutboundRoute {
}
impl RAAMonitorUpdateBlockingAction {
- #[allow(unused)]
fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
Self::ForwardedPaymentInboundClaim {
channel_id: prev_hop.outpoint.to_channel_id(),
&'g L
>;
-macro_rules! define_test_pub_trait { ($vis: vis) => {
-/// A trivial trait which describes any [`ChannelManager`] used in testing.
-$vis trait AChannelManager {
+/// A trivial trait which describes any [`ChannelManager`].
+pub trait AChannelManager {
+ /// A type implementing [`chain::Watch`].
type Watch: chain::Watch<Self::Signer> + ?Sized;
+ /// A type that may be dereferenced to [`Self::Watch`].
type M: Deref<Target = Self::Watch>;
+ /// A type implementing [`BroadcasterInterface`].
type Broadcaster: BroadcasterInterface + ?Sized;
+ /// A type that may be dereferenced to [`Self::Broadcaster`].
type T: Deref<Target = Self::Broadcaster>;
+ /// A type implementing [`EntropySource`].
type EntropySource: EntropySource + ?Sized;
+ /// A type that may be dereferenced to [`Self::EntropySource`].
type ES: Deref<Target = Self::EntropySource>;
+ /// A type implementing [`NodeSigner`].
type NodeSigner: NodeSigner + ?Sized;
+ /// A type that may be dereferenced to [`Self::NodeSigner`].
type NS: Deref<Target = Self::NodeSigner>;
+ /// A type implementing [`WriteableEcdsaChannelSigner`].
type Signer: WriteableEcdsaChannelSigner + Sized;
+ /// A type implementing [`SignerProvider`] for [`Self::Signer`].
type SignerProvider: SignerProvider<Signer = Self::Signer> + ?Sized;
+ /// A type that may be dereferenced to [`Self::SignerProvider`].
type SP: Deref<Target = Self::SignerProvider>;
+ /// A type implementing [`FeeEstimator`].
type FeeEstimator: FeeEstimator + ?Sized;
+ /// A type that may be dereferenced to [`Self::FeeEstimator`].
type F: Deref<Target = Self::FeeEstimator>;
+ /// A type implementing [`Router`].
type Router: Router + ?Sized;
+ /// A type that may be dereferenced to [`Self::Router`].
type R: Deref<Target = Self::Router>;
+ /// A type implementing [`Logger`].
type Logger: Logger + ?Sized;
+ /// A type that may be dereferenced to [`Self::Logger`].
type L: Deref<Target = Self::Logger>;
+ /// Returns a reference to the actual [`ChannelManager`] object.
fn get_cm(&self) -> &ChannelManager<Self::M, Self::T, Self::ES, Self::NS, Self::SP, Self::F, Self::R, Self::L>;
}
-} }
-#[cfg(any(test, feature = "_test_utils"))]
-define_test_pub_trait!(pub);
-#[cfg(not(any(test, feature = "_test_utils")))]
-define_test_pub_trait!(pub(crate));
+
impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> AChannelManager
for ChannelManager<M, T, ES, NS, SP, F, R, L>
where
/// called [`funding_transaction_generated`] for outbound channels) being closed.
///
/// Note that you can be a bit lazier about writing out `ChannelManager` than you can be with
-/// [`ChannelMonitor`]. With [`ChannelMonitor`] you MUST write each monitor update out to disk before
-/// returning from [`chain::Watch::watch_channel`]/[`update_channel`], with ChannelManagers, writing updates
-/// happens out-of-band (and will prevent any other `ChannelManager` operations from occurring during
-/// the serialization process). If the deserialized version is out-of-date compared to the
-/// [`ChannelMonitor`] passed by reference to [`read`], those channels will be force-closed based on the
-/// `ChannelMonitor` state and no funds will be lost (mod on-chain transaction fees).
+/// [`ChannelMonitor`]. With [`ChannelMonitor`] you MUST durably write each
+/// [`ChannelMonitorUpdate`] before returning from
+/// [`chain::Watch::watch_channel`]/[`update_channel`] or before completing async writes. With
+/// `ChannelManager`s, writing updates happens out-of-band (and will prevent any other
+/// `ChannelManager` operations from occurring during the serialization process). If the
+/// deserialized version is out-of-date compared to the [`ChannelMonitor`] passed by reference to
+/// [`read`], those channels will be force-closed based on the `ChannelMonitor` state and no funds
+/// will be lost (modulo on-chain transaction fees).
///
/// Note that the deserializer is only implemented for `(`[`BlockHash`]`, `[`ChannelManager`]`)`, which
/// tells you the last block hash which was connected. You should get the best block tip before using the manager.
/// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
/// Notifier the lock contains sends out a notification when the lock is released.
total_consistency_lock: RwLock<()>,
+ /// Tracks the progress of channels going through batch funding by whether funding_signed was
+ /// received and the monitor has been persisted.
+ ///
+ /// This information does not need to be persisted as funding nodes can forget
+ /// unfunded channels upon disconnection.
+ funding_batch_states: Mutex<BTreeMap<Txid, Vec<(ChannelId, PublicKey, bool)>>>,
background_events_processed_since_startup: AtomicBool,
}
/// Details of a channel, as returned by [`ChannelManager::list_channels`] and [`ChannelManager::list_usable_channels`]
-///
-/// Balances of a channel are available through [`ChainMonitor::get_claimable_balances`] and
-/// [`ChannelMonitor::get_claimable_balances`], calculated with respect to the corresponding on-chain
-/// transactions.
-///
-/// [`ChainMonitor::get_claimable_balances`]: crate::chain::chainmonitor::ChainMonitor::get_claimable_balances
#[derive(Clone, Debug, PartialEq)]
pub struct ChannelDetails {
/// The channel's ID (prior to funding transaction generation, this is a random 32 bytes,
///
/// This value will be `None` for objects serialized with LDK versions prior to 0.0.115.
pub feerate_sat_per_1000_weight: Option<u32>,
+ /// Our total balance. This is the amount we would get if we close the channel.
+ /// This value is not exact. Due to various in-flight changes and feerate changes, exactly this
+ /// amount is not likely to be recoverable on close.
+ ///
+ /// This does not include any pending HTLCs which are not yet fully resolved (and, thus, whose
+ /// balance is not available for inclusion in new outbound HTLCs). This further does not include
+ /// any pending outgoing HTLCs which are awaiting some other resolution to be sent.
+ /// This does not consider any on-chain fees.
+ ///
+ /// See also [`ChannelDetails::outbound_capacity_msat`]
+ pub balance_msat: u64,
/// The available outbound capacity for sending HTLCs to the remote peer. This does not include
/// any pending HTLCs which are not yet fully resolved (and, thus, whose balance is not
/// available for inclusion in new outbound HTLCs). This further does not include any pending
/// outgoing HTLCs which are awaiting some other resolution to be sent.
///
+ /// See also [`ChannelDetails::balance_msat`]
+ ///
/// This value is not exact. Due to various in-flight changes, feerate changes, and our
/// conflict-avoidance policy, exactly this amount is not likely to be spendable. However, we
/// should be able to spend nearly this amount.
/// the current state and per-HTLC limit(s). This is intended for use when routing, allowing us
/// to use a limit as close as possible to the HTLC limit we can currently send.
///
- /// See also [`ChannelDetails::next_outbound_htlc_minimum_msat`] and
- /// [`ChannelDetails::outbound_capacity_msat`].
+ /// See also [`ChannelDetails::next_outbound_htlc_minimum_msat`],
+ /// [`ChannelDetails::balance_msat`], and [`ChannelDetails::outbound_capacity_msat`].
pub next_outbound_htlc_limit_msat: u64,
/// The minimum value for sending a single HTLC to the remote peer. This is the equivalent of
/// [`ChannelDetails::next_outbound_htlc_limit_msat`] but represents a lower-bound, rather than
channel_value_satoshis: context.get_value_satoshis(),
feerate_sat_per_1000_weight: Some(context.get_feerate_sat_per_1000_weight()),
unspendable_punishment_reserve: to_self_reserve_satoshis,
+ balance_msat: balance.balance_msat,
inbound_capacity_msat: balance.inbound_capacity_msat,
outbound_capacity_msat: balance.outbound_capacity_msat,
next_outbound_htlc_limit_msat: balance.next_outbound_htlc_limit_msat,
pub enum RecentPaymentDetails {
/// When an invoice was requested and thus a payment has not yet been sent.
AwaitingInvoice {
- /// Identifier for the payment to ensure idempotency.
+ /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
+ /// a payment and ensure idempotency in LDK.
payment_id: PaymentId,
},
/// When a payment is still being sent and awaiting successful delivery.
Pending {
+ /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
+ /// a payment and ensure idempotency in LDK.
+ payment_id: PaymentId,
/// Hash of the payment that is currently being sent but has yet to be fulfilled or
/// abandoned.
payment_hash: PaymentHash,
/// been resolved. Upon receiving [`Event::PaymentSent`], we delay for a few minutes before the
/// payment is removed from tracking.
Fulfilled {
+ /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
+ /// a payment and ensure idempotency in LDK.
+ payment_id: PaymentId,
/// Hash of the payment that was claimed. `None` for serializations of [`ChannelManager`]
/// made before LDK version 0.0.104.
payment_hash: Option<PaymentHash>,
/// abandoned via [`ChannelManager::abandon_payment`], it is marked as abandoned until all
/// pending HTLCs for this payment resolve and an [`Event::PaymentFailed`] is generated.
Abandoned {
+ /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
+ /// a payment and ensure idempotency in LDK.
+ payment_id: PaymentId,
/// Hash of the payment that we have given up trying to send.
payment_hash: PaymentHash,
},
let mut msg_events = Vec::with_capacity(2);
if let Some((shutdown_res, update_option)) = shutdown_finish {
- $self.finish_force_close_channel(shutdown_res);
+ $self.finish_close_channel(shutdown_res);
if let Some(update) = update_option {
msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
}
let channel_id = $chan.context.channel_id();
+ let unbroadcasted_batch_funding_txid = $chan.context.unbroadcasted_batch_funding_txid();
core::mem::drop($peer_state_lock);
core::mem::drop($per_peer_state_lock);
+ // If the channel belongs to a batch funding transaction, the progress of the batch
+ // should be updated as we have received funding_signed and persisted the monitor.
+ if let Some(txid) = unbroadcasted_batch_funding_txid {
+ let mut funding_batch_states = $self.funding_batch_states.lock().unwrap();
+ let mut batch_completed = false;
+ if let Some(batch_state) = funding_batch_states.get_mut(&txid) {
+ let channel_state = batch_state.iter_mut().find(|(chan_id, pubkey, _)| (
+ *chan_id == channel_id &&
+ *pubkey == counterparty_node_id
+ ));
+ if let Some(channel_state) = channel_state {
+ channel_state.2 = true;
+ } else {
+ debug_assert!(false, "Missing channel batch state for channel which completed initial monitor update");
+ }
+ batch_completed = batch_state.iter().all(|(_, _, completed)| *completed);
+ } else {
+ debug_assert!(false, "Missing batch state for channel which completed initial monitor update");
+ }
+
+ // When all channels in a batched funding transaction have become ready, it is not necessary
+ // to track the progress of the batch anymore and the state of the channels can be updated.
+ if batch_completed {
+ let removed_batch_state = funding_batch_states.remove(&txid).into_iter().flatten();
+ let per_peer_state = $self.per_peer_state.read().unwrap();
+ let mut batch_funding_tx = None;
+ for (channel_id, counterparty_node_id, _) in removed_batch_state {
+ if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+ let mut peer_state = peer_state_mutex.lock().unwrap();
+ if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&channel_id) {
+ batch_funding_tx = batch_funding_tx.or_else(|| chan.context.unbroadcasted_funding());
+ chan.set_batch_ready();
+ let mut pending_events = $self.pending_events.lock().unwrap();
+ emit_channel_pending_event!(pending_events, chan);
+ }
+ }
+ }
+ if let Some(tx) = batch_funding_tx {
+ log_info!($self.logger, "Broadcasting batch funding transaction with txid {}", tx.txid());
+ $self.tx_broadcaster.broadcast_transactions(&[&tx]);
+ }
+ }
+ }
+
$self.handle_monitor_update_completion_actions(update_actions);
if let Some(forwards) = htlc_forwards {
}
macro_rules! handle_new_monitor_update {
- ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { {
- // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
- // any case so that it won't deadlock.
- debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
+ ($self: ident, $update_res: expr, $chan: expr, _internal, $completed: expr) => { {
debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
match $update_res {
+ ChannelMonitorUpdateStatus::UnrecoverableError => {
+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
+ log_error!($self.logger, "{}", err_str);
+ panic!("{}", err_str);
+ },
ChannelMonitorUpdateStatus::InProgress => {
log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
&$chan.context.channel_id());
- Ok(false)
- },
- ChannelMonitorUpdateStatus::PermanentFailure => {
- log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure",
- &$chan.context.channel_id());
- update_maps_on_chan_removal!($self, &$chan.context);
- let res = Err(MsgHandleErrInternal::from_finish_shutdown(
- "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(),
- $chan.context.get_user_id(), $chan.context.force_shutdown(false),
- $self.get_channel_update_for_broadcast(&$chan).ok(), $chan.context.get_value_satoshis()));
- $remove;
- res
+ false
},
ChannelMonitorUpdateStatus::Completed => {
$completed;
- Ok(true)
+ true
},
}
} };
- ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => {
- handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state,
- $per_peer_state_lock, $chan, _internal, $remove,
+ ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, INITIAL_MONITOR) => {
+ handle_new_monitor_update!($self, $update_res, $chan, _internal,
handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan))
};
- ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => {
- if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() {
- handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state,
- $per_peer_state_lock, chan, MANUALLY_REMOVING_INITIAL_MONITOR, { $chan_entry.remove() })
- } else {
- // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to
- // update).
- debug_assert!(false);
- let channel_id = *$chan_entry.key();
- let (_, err) = convert_chan_phase_err!($self, ChannelError::Close(
- "Cannot update monitor for unfunded channels as they don't have monitors yet".into()),
- $chan_entry.get_mut(), &channel_id);
- $chan_entry.remove();
- Err(err)
- }
- };
- ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+ ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
.or_insert_with(Vec::new);
// During startup, we push monitor updates as background events through to here in
in_flight_updates.len() - 1
});
let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]);
- handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state,
- $per_peer_state_lock, $chan, _internal, $remove,
+ handle_new_monitor_update!($self, update_res, $chan, _internal,
{
let _ = in_flight_updates.remove(idx);
if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
}
})
} };
- ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
- if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() {
- handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state,
- $per_peer_state_lock, chan, MANUALLY_REMOVING, { $chan_entry.remove() })
- } else {
- // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to
- // update).
- debug_assert!(false);
- let channel_id = *$chan_entry.key();
- let (_, err) = convert_chan_phase_err!($self, ChannelError::Close(
- "Cannot update monitor for unfunded channels as they don't have monitors yet".into()),
- $chan_entry.get_mut(), &channel_id);
- $chan_entry.remove();
- Err(err)
- }
- }
}
macro_rules! process_events_body {
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
-
event_persist_notifier: Notifier::new(),
needs_persist_flag: AtomicBool::new(false),
+ funding_batch_states: Mutex::new(BTreeMap::new()),
entropy_source,
node_signer,
},
PendingOutboundPayment::Retryable { payment_hash, total_msat, .. } => {
Some(RecentPaymentDetails::Pending {
+ payment_id: *payment_id,
payment_hash: *payment_hash,
total_msat: *total_msat,
})
},
PendingOutboundPayment::Abandoned { payment_hash, .. } => {
- Some(RecentPaymentDetails::Abandoned { payment_hash: *payment_hash })
+ Some(RecentPaymentDetails::Abandoned { payment_id: *payment_id, payment_hash: *payment_hash })
},
PendingOutboundPayment::Fulfilled { payment_hash, .. } => {
- Some(RecentPaymentDetails::Fulfilled { payment_hash: *payment_hash })
+ Some(RecentPaymentDetails::Fulfilled { payment_id: *payment_id, payment_hash: *payment_hash })
},
PendingOutboundPayment::Legacy { .. } => None
})
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
- let result: Result<(), _> = loop {
- {
- let per_peer_state = self.per_peer_state.read().unwrap();
+ let mut shutdown_result = None;
+ loop {
+ let per_peer_state = self.per_peer_state.read().unwrap();
- let peer_state_mutex = per_peer_state.get(counterparty_node_id)
- .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
+ let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+ .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+ let peer_state = &mut *peer_state_lock;
- match peer_state.channel_by_id.entry(channel_id.clone()) {
- hash_map::Entry::Occupied(mut chan_phase_entry) => {
- if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
- let funding_txo_opt = chan.context.get_funding_txo();
- let their_features = &peer_state.latest_features;
- let (shutdown_msg, mut monitor_update_opt, htlcs) =
- chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
- failed_htlcs = htlcs;
+ match peer_state.channel_by_id.entry(channel_id.clone()) {
+ hash_map::Entry::Occupied(mut chan_phase_entry) => {
+ if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
+ let funding_txo_opt = chan.context.get_funding_txo();
+ let their_features = &peer_state.latest_features;
+ let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
+ let (shutdown_msg, mut monitor_update_opt, htlcs) =
+ chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
+ failed_htlcs = htlcs;
+
+ // We can send the `shutdown` message before updating the `ChannelMonitor`
+ // here as we don't need the monitor update to complete until we send a
+ // `shutdown_signed`, which we'll delay if we're pending a monitor update.
+ peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+ node_id: *counterparty_node_id,
+ msg: shutdown_msg,
+ });
- // We can send the `shutdown` message before updating the `ChannelMonitor`
- // here as we don't need the monitor update to complete until we send a
- // `shutdown_signed`, which we'll delay if we're pending a monitor update.
- peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
- node_id: *counterparty_node_id,
- msg: shutdown_msg,
- });
+ debug_assert!(monitor_update_opt.is_none() || !chan.is_shutdown(),
+ "We can't both complete shutdown and generate a monitor update");
- // Update the monitor with the shutdown script if necessary.
- if let Some(monitor_update) = monitor_update_opt.take() {
- break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
- peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ());
- }
+ // Update the monitor with the shutdown script if necessary.
+ if let Some(monitor_update) = monitor_update_opt.take() {
+ handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+ peer_state_lock, peer_state, per_peer_state, chan);
+ break;
+ }
- if chan.is_shutdown() {
- if let ChannelPhase::Funded(chan) = remove_channel_phase!(self, chan_phase_entry) {
- if let Ok(channel_update) = self.get_channel_update_for_broadcast(&chan) {
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
- msg: channel_update
- });
- }
- self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
+ if chan.is_shutdown() {
+ if let ChannelPhase::Funded(chan) = remove_channel_phase!(self, chan_phase_entry) {
+ if let Ok(channel_update) = self.get_channel_update_for_broadcast(&chan) {
+ peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ msg: channel_update
+ });
}
+ self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
+ shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
}
- break Ok(());
}
- },
- hash_map::Entry::Vacant(_) => (),
- }
+ break;
+ }
+ },
+ hash_map::Entry::Vacant(_) => {
+ // If we reach this point, it means that the channel_id either refers to an unfunded channel or
+ // it does not exist for this peer. Either way, we can attempt to force-close it.
+ //
+ // An appropriate error will be returned for non-existence of the channel if that's the case.
+ return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ())
+ },
}
- // If we reach this point, it means that the channel_id either refers to an unfunded channel or
- // it does not exist for this peer. Either way, we can attempt to force-close it.
- //
- // An appropriate error will be returned for non-existence of the channel if that's the case.
- return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ())
- };
+ }
for htlc_source in failed_htlcs.drain(..) {
let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
}
- let _ = handle_error!(self, result, *counterparty_node_id);
+ if let Some(shutdown_result) = shutdown_result {
+ self.finish_close_channel(shutdown_result);
+ }
+
Ok(())
}
self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script)
}
- #[inline]
- fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) {
- let (monitor_update_option, mut failed_htlcs) = shutdown_res;
+ fn finish_close_channel(&self, shutdown_res: ShutdownResult) {
+ debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
+ #[cfg(debug_assertions)]
+ for (_, peer) in self.per_peer_state.read().unwrap().iter() {
+ debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
+ }
+
+ let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res;
log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len());
for htlc_source in failed_htlcs.drain(..) {
let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
// ignore the result here.
let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
}
+ let mut shutdown_results = Vec::new();
+ if let Some(txid) = unbroadcasted_batch_funding_txid {
+ let mut funding_batch_states = self.funding_batch_states.lock().unwrap();
+ let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten();
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ let mut has_uncompleted_channel = None;
+ for (channel_id, counterparty_node_id, state) in affected_channels {
+ if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+ let mut peer_state = peer_state_mutex.lock().unwrap();
+ if let Some(mut chan) = peer_state.channel_by_id.remove(&channel_id) {
+ update_maps_on_chan_removal!(self, &chan.context());
+ self.issue_channel_close_events(&chan.context(), ClosureReason::FundingBatchClosure);
+ shutdown_results.push(chan.context_mut().force_shutdown(false));
+ }
+ }
+ has_uncompleted_channel = Some(has_uncompleted_channel.map_or(!state, |v| v || !state));
+ }
+ debug_assert!(
+ has_uncompleted_channel.unwrap_or(true),
+ "Closing a batch where all channels have completed initial monitor update",
+ );
+ }
+ for shutdown_result in shutdown_results.drain(..) {
+ self.finish_close_channel(shutdown_result);
+ }
}
/// `peer_msg` should be set when we receive a message from a peer, but not set when the
let peer_state_mutex = per_peer_state.get(peer_node_id)
.ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?;
let (update_opt, counterparty_node_id) = {
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
+ let mut peer_state = peer_state_mutex.lock().unwrap();
let closure_reason = if let Some(peer_msg) = peer_msg {
ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) }
} else {
log_error!(self.logger, "Force-closing channel {}", channel_id);
self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason);
let mut chan_phase = remove_channel_phase!(self, chan_phase_entry);
+ mem::drop(peer_state);
+ mem::drop(per_peer_state);
match chan_phase {
ChannelPhase::Funded(mut chan) => {
- self.finish_force_close_channel(chan.context.force_shutdown(broadcast));
+ self.finish_close_channel(chan.context.force_shutdown(broadcast));
(self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id())
},
ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => {
- self.finish_force_close_channel(chan_phase.context_mut().force_shutdown(false));
+ self.finish_close_channel(chan_phase.context_mut().force_shutdown(false));
// Unfunded channel has no update
(None, chan_phase.context().get_counterparty_node_id())
},
}
};
if let Some(update) = update_opt {
- let mut peer_state = peer_state_mutex.lock().unwrap();
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
- msg: update
- });
+ // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
+ // not try to broadcast it via whatever peer we have.
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ let a_peer_state_opt = per_peer_state.get(peer_node_id)
+ .ok_or(per_peer_state.values().next());
+ if let Ok(a_peer_state_mutex) = a_peer_state_opt {
+ let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
+ a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
}
Ok(counterparty_node_id)
let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data {
msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } =>
(short_channel_id, amt_to_forward, outgoing_cltv_value),
- msgs::InboundOnionPayload::Receive { .. } =>
+ msgs::InboundOnionPayload::Receive { .. } | msgs::InboundOnionPayload::BlindedReceive { .. } =>
return Err(InboundOnionErr {
msg: "Final Node OnionHopData provided for us as an intermediary node",
err_code: 0x4000 | 22,
payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata, ..
} =>
(payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata),
- _ =>
+ msgs::InboundOnionPayload::BlindedReceive {
+ amt_msat, total_msat, outgoing_cltv_value, payment_secret, ..
+ } => {
+ let payment_data = msgs::FinalOnionHopData { payment_secret, total_msat };
+ (Some(payment_data), None, Vec::new(), amt_msat, outgoing_cltv_value, None)
+ }
+ msgs::InboundOnionPayload::Forward { .. } => {
return Err(InboundOnionErr {
err_code: 0x4000|22,
err_data: Vec::new(),
msg: "Got non final data with an HMAC of 0",
- }),
+ })
+ },
};
// final_incorrect_cltv_expiry
if outgoing_cltv_value > cltv_expiry {
}
}
- let next_hop = match onion_utils::decode_next_payment_hop(shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, msg.payment_hash) {
+ let next_hop = match onion_utils::decode_next_payment_hop(
+ shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac,
+ msg.payment_hash, &self.node_signer
+ ) {
Ok(res) => res,
Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
return_malformed_err!(err_msg, err_code);
// We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the
// inbound channel's state.
onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)),
- onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => {
+ onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } |
+ onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::BlindedReceive { .. }, .. } =>
+ {
return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]);
}
};
}, onion_packet, None, &self.fee_estimator, &self.logger);
match break_chan_phase_entry!(self, send_res, chan_phase_entry) {
Some(monitor_update) => {
- match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan_phase_entry) {
- Err(e) => break Err(e),
- Ok(false) => {
+ match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) {
+ false => {
// Note that MonitorUpdateInProgress here indicates (per function
// docs) that we will resend the commitment update once monitor
// updating completes. Therefore, we must return an error
// MonitorUpdateInProgress, below.
return Err(APIError::MonitorUpdateInProgress);
},
- Ok(true) => {},
+ true => {},
}
},
None => {},
/// In general, a path may raise:
/// * [`APIError::InvalidRoute`] when an invalid route or forwarding parameter (cltv_delta, fee,
/// node public key) is specified.
- /// * [`APIError::ChannelUnavailable`] if the next-hop channel is not available for updates
- /// (including due to previous monitor update failure or new permanent monitor update
- /// failure).
+ /// * [`APIError::ChannelUnavailable`] if the next-hop channel is not available as it has been
+ /// closed, doesn't exist, or the peer is currently disconnected.
/// * [`APIError::MonitorUpdateInProgress`] if a new monitor update failure prevented sending the
/// relevant updates.
///
outbound_payment::payment_is_probe(payment_hash, payment_id, self.probing_cookie_secret)
}
+ /// Sends payment probes over all paths of a route that would be used to pay the given
+ /// amount to the given `node_id`.
+ ///
+ /// See [`ChannelManager::send_preflight_probes`] for more information.
+ pub fn send_spontaneous_preflight_probes(
+ &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32,
+ liquidity_limit_multiplier: Option<u64>,
+ ) -> Result<Vec<(PaymentHash, PaymentId)>, ProbeSendFailure> {
+ let payment_params =
+ PaymentParameters::from_node_id(node_id, final_cltv_expiry_delta);
+
+ let route_params = RouteParameters::from_payment_params_and_value(payment_params, amount_msat);
+
+ self.send_preflight_probes(route_params, liquidity_limit_multiplier)
+ }
+
+ /// Sends payment probes over all paths of a route that would be used to pay a route found
+ /// according to the given [`RouteParameters`].
+ ///
+ /// This may be used to send "pre-flight" probes, i.e., to train our scorer before conducting
+ /// the actual payment. Note this is only useful if there likely is sufficient time for the
+ /// probe to settle before sending out the actual payment, e.g., when waiting for user
+ /// confirmation in a wallet UI.
+ ///
+ /// Otherwise, there is a chance the probe could take up some liquidity needed to complete the
+ /// actual payment. Users should therefore be cautious and might avoid sending probes if
+ /// liquidity is scarce and/or they don't expect the probe to return before they send the
+ /// payment. To mitigate this issue, channels with available liquidity less than the required
+ /// amount times the given `liquidity_limit_multiplier` won't be used to send pre-flight
+ /// probes. If `None` is given as `liquidity_limit_multiplier`, it defaults to `3`.
+ pub fn send_preflight_probes(
+ &self, route_params: RouteParameters, liquidity_limit_multiplier: Option<u64>,
+ ) -> Result<Vec<(PaymentHash, PaymentId)>, ProbeSendFailure> {
+ let liquidity_limit_multiplier = liquidity_limit_multiplier.unwrap_or(3);
+
+ let payer = self.get_our_node_id();
+ let usable_channels = self.list_usable_channels();
+ let first_hops = usable_channels.iter().collect::<Vec<_>>();
+ let inflight_htlcs = self.compute_inflight_htlcs();
+
+ let route = self
+ .router
+ .find_route(&payer, &route_params, Some(&first_hops), inflight_htlcs)
+ .map_err(|e| {
+ log_error!(self.logger, "Failed to find path for payment probe: {:?}", e);
+ ProbeSendFailure::RouteNotFound
+ })?;
+
+ let mut used_liquidity_map = HashMap::with_capacity(first_hops.len());
+
+ let mut res = Vec::new();
+
+ for mut path in route.paths {
+ // If the last hop is probably an unannounced channel we refrain from probing all the
+ // way through to the end and instead probe up to the second-to-last channel.
+ while let Some(last_path_hop) = path.hops.last() {
+ if last_path_hop.maybe_announced_channel {
+ // We found a potentially announced last hop.
+ break;
+ } else {
+ // Drop the last hop, as it's likely unannounced.
+ log_debug!(
+ self.logger,
+ "Avoided sending payment probe all the way to last hop {} as it is likely unannounced.",
+ last_path_hop.short_channel_id
+ );
+ let final_value_msat = path.final_value_msat();
+ path.hops.pop();
+ if let Some(new_last) = path.hops.last_mut() {
+ new_last.fee_msat += final_value_msat;
+ }
+ }
+ }
+
+ if path.hops.len() < 2 {
+ log_debug!(
+ self.logger,
+ "Skipped sending payment probe over path with less than two hops."
+ );
+ continue;
+ }
+
+ if let Some(first_path_hop) = path.hops.first() {
+ if let Some(first_hop) = first_hops.iter().find(|h| {
+ h.get_outbound_payment_scid() == Some(first_path_hop.short_channel_id)
+ }) {
+ let path_value = path.final_value_msat() + path.fee_msat();
+ let used_liquidity =
+ used_liquidity_map.entry(first_path_hop.short_channel_id).or_insert(0);
+
+ if first_hop.next_outbound_htlc_limit_msat
+ < (*used_liquidity + path_value) * liquidity_limit_multiplier
+ {
+ log_debug!(self.logger, "Skipped sending payment probe to avoid putting channel {} under the liquidity limit.", first_path_hop.short_channel_id);
+ continue;
+ } else {
+ *used_liquidity += path_value;
+ }
+ }
+ }
+
+ res.push(self.send_probe(path).map_err(|e| {
+ log_error!(self.logger, "Failed to send pre-flight probe: {:?}", e);
+ ProbeSendFailure::SendingFailed(e)
+ })?);
+ }
+
+ Ok(res)
+ }
+
/// Handles the generation of a funding transaction, optionally (for tests) with a function
/// which checks the correctness of the funding transaction given the associated channel.
- fn funding_transaction_generated_intern<FundingOutput: Fn(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
- &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
+ fn funding_transaction_generated_intern<FundingOutput: FnMut(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
+ &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, is_batch_funding: bool,
+ mut find_funding_output: FundingOutput,
) -> Result<(), APIError> {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
Some(ChannelPhase::UnfundedOutboundV1(chan)) => {
let funding_txo = find_funding_output(&chan, &funding_transaction)?;
- let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger)
+ let funding_res = chan.get_funding_created(funding_transaction, funding_txo, is_batch_funding, &self.logger)
.map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e {
let channel_id = chan.context.channel_id();
let user_id = chan.context.get_user_id();
#[cfg(test)]
pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> {
- self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |_, tx| {
+ self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, false, |_, tx| {
Ok(OutPoint { txid: tx.txid(), index: output_index })
})
}
/// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady
/// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed
pub fn funding_transaction_generated(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> {
+ self.batch_funding_transaction_generated(&[(temporary_channel_id, counterparty_node_id)], funding_transaction)
+ }
+
+ /// Call this upon creation of a batch funding transaction for the given channels.
+ ///
+ /// Return values are identical to [`Self::funding_transaction_generated`], respective to
+ /// each individual channel and transaction output.
+ ///
+ /// Do NOT broadcast the funding transaction yourself. This batch funding transcaction
+ /// will only be broadcast when we have safely received and persisted the counterparty's
+ /// signature for each channel.
+ ///
+ /// If there is an error, all channels in the batch are to be considered closed.
+ pub fn batch_funding_transaction_generated(&self, temporary_channels: &[(&ChannelId, &PublicKey)], funding_transaction: Transaction) -> Result<(), APIError> {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+ let mut result = Ok(());
if !funding_transaction.is_coin_base() {
for inp in funding_transaction.input.iter() {
if inp.witness.is_empty() {
- return Err(APIError::APIMisuseError {
+ result = result.and(Err(APIError::APIMisuseError {
err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned()
- });
+ }));
}
}
}
+ if funding_transaction.output.len() > u16::max_value() as usize {
+ result = result.and(Err(APIError::APIMisuseError {
+ err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
+ }));
+ }
{
let height = self.best_block.read().unwrap().height();
// Transactions are evaluated as final by network mempools if their locktime is strictly
// node might not have perfect sync about their blockchain views. Thus, if the wallet
// module is ahead of LDK, only allow one more block of headroom.
if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 {
- return Err(APIError::APIMisuseError {
+ result = result.and(Err(APIError::APIMisuseError {
err: "Funding transaction absolute timelock is non-final".to_owned()
- });
+ }));
}
}
- self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| {
- if tx.output.len() > u16::max_value() as usize {
- return Err(APIError::APIMisuseError {
- err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
- });
- }
- let mut output_index = None;
- let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
- for (idx, outp) in tx.output.iter().enumerate() {
- if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
- if output_index.is_some() {
+ let txid = funding_transaction.txid();
+ let is_batch_funding = temporary_channels.len() > 1;
+ let mut funding_batch_states = if is_batch_funding {
+ Some(self.funding_batch_states.lock().unwrap())
+ } else {
+ None
+ };
+ let mut funding_batch_state = funding_batch_states.as_mut().and_then(|states| {
+ match states.entry(txid) {
+ btree_map::Entry::Occupied(_) => {
+ result = result.clone().and(Err(APIError::APIMisuseError {
+ err: "Batch funding transaction with the same txid already exists".to_owned()
+ }));
+ None
+ },
+ btree_map::Entry::Vacant(vacant) => Some(vacant.insert(Vec::new())),
+ }
+ });
+ for (channel_idx, &(temporary_channel_id, counterparty_node_id)) in temporary_channels.iter().enumerate() {
+ result = result.and_then(|_| self.funding_transaction_generated_intern(
+ temporary_channel_id,
+ counterparty_node_id,
+ funding_transaction.clone(),
+ is_batch_funding,
+ |chan, tx| {
+ let mut output_index = None;
+ let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
+ for (idx, outp) in tx.output.iter().enumerate() {
+ if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
+ if output_index.is_some() {
+ return Err(APIError::APIMisuseError {
+ err: "Multiple outputs matched the expected script and value".to_owned()
+ });
+ }
+ output_index = Some(idx as u16);
+ }
+ }
+ if output_index.is_none() {
return Err(APIError::APIMisuseError {
- err: "Multiple outputs matched the expected script and value".to_owned()
+ err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
});
}
- output_index = Some(idx as u16);
+ let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() };
+ if let Some(funding_batch_state) = funding_batch_state.as_mut() {
+ funding_batch_state.push((outpoint.to_channel_id(), *counterparty_node_id, false));
+ }
+ Ok(outpoint)
+ })
+ );
+ }
+ if let Err(ref e) = result {
+ // Remaining channels need to be removed on any error.
+ let e = format!("Error in transaction funding: {:?}", e);
+ let mut channels_to_remove = Vec::new();
+ channels_to_remove.extend(funding_batch_states.as_mut()
+ .and_then(|states| states.remove(&txid))
+ .into_iter().flatten()
+ .map(|(chan_id, node_id, _state)| (chan_id, node_id))
+ );
+ channels_to_remove.extend(temporary_channels.iter()
+ .map(|(&chan_id, &node_id)| (chan_id, node_id))
+ );
+ let mut shutdown_results = Vec::new();
+ {
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ for (channel_id, counterparty_node_id) in channels_to_remove {
+ per_peer_state.get(&counterparty_node_id)
+ .map(|peer_state_mutex| peer_state_mutex.lock().unwrap())
+ .and_then(|mut peer_state| peer_state.channel_by_id.remove(&channel_id))
+ .map(|mut chan| {
+ update_maps_on_chan_removal!(self, &chan.context());
+ self.issue_channel_close_events(&chan.context(), ClosureReason::ProcessingError { err: e.clone() });
+ shutdown_results.push(chan.context_mut().force_shutdown(false));
+ });
}
}
- if output_index.is_none() {
- return Err(APIError::APIMisuseError {
- err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
- });
+ for shutdown_result in shutdown_results.drain(..) {
+ self.finish_close_channel(shutdown_result);
}
- Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() })
- })
+ }
+ result
}
/// Atomically applies partial updates to the [`ChannelConfig`] of the given channels.
let phantom_pubkey_res = self.node_signer.get_node_id(Recipient::PhantomNode);
if phantom_pubkey_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id, &self.genesis_hash) {
let phantom_shared_secret = self.node_signer.ecdh(Recipient::PhantomNode, &onion_packet.public_key.unwrap(), None).unwrap().secret_bytes();
- let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) {
+ let next_hop = match onion_utils::decode_next_payment_hop(
+ phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac,
+ payment_hash, &self.node_signer
+ ) {
Ok(res) => res,
Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner();
},
BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => {
let mut updated_chan = false;
- let res = {
+ {
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) {
hash_map::Entry::Occupied(mut chan_phase) => {
- updated_chan = true;
- handle_new_monitor_update!(self, funding_txo, update.clone(),
- peer_state_lock, peer_state, per_peer_state, chan_phase).map(|_| ())
+ if let ChannelPhase::Funded(chan) = chan_phase.get_mut() {
+ updated_chan = true;
+ handle_new_monitor_update!(self, funding_txo, update.clone(),
+ peer_state_lock, peer_state, per_peer_state, chan);
+ } else {
+ debug_assert!(false, "We shouldn't have an update for a non-funded channel");
+ }
},
- hash_map::Entry::Vacant(_) => Ok(()),
+ hash_map::Entry::Vacant(_) => {},
}
- } else { Ok(()) }
- };
+ }
+ }
if !updated_chan {
// TODO: Track this as in-flight even though the channel is closed.
let _ = self.chain_monitor.update_channel(funding_txo, &update);
}
- // TODO: If this channel has since closed, we're likely providing a payment
- // preimage update, which we must ensure is durable! We currently don't,
- // however, ensure that.
- if res.is_err() {
- log_error!(self.logger,
- "Failed to provide ChannelMonitorUpdate to closed channel! This likely lost us a payment preimage!");
- }
- let _ = handle_error!(self, res, counterparty_node_id);
},
BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => {
let per_peer_state = self.per_peer_state.read().unwrap();
let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
let mut timed_out_mpp_htlcs = Vec::new();
let mut pending_peers_awaiting_removal = Vec::new();
+ let mut shutdown_channels = Vec::new();
- let process_unfunded_channel_tick = |
+ let mut process_unfunded_channel_tick = |
chan_id: &ChannelId,
context: &mut ChannelContext<SP>,
unfunded_context: &mut UnfundedChannelContext,
"Force-closing pending channel with ID {} for not establishing in a timely manner", chan_id);
update_maps_on_chan_removal!(self, &context);
self.issue_channel_close_events(&context, ClosureReason::HolderForceClosed);
- self.finish_force_close_channel(context.force_shutdown(false));
+ shutdown_channels.push(context.force_shutdown(false));
pending_msg_events.push(MessageSendEvent::HandleError {
node_id: counterparty_node_id,
action: msgs::ErrorAction::SendErrorMessage {
let _ = handle_error!(self, err, counterparty_node_id);
}
+ for shutdown_res in shutdown_channels {
+ self.finish_close_channel(shutdown_res);
+ }
+
self.pending_outbound_payments.remove_stale_payments(&self.pending_events);
// Technically we don't need to do this here, but if we have holding cell entries in a
// This ensures that future code doesn't introduce a lock-order requirement for
// `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
// this function with any `per_peer_state` peer lock acquired would.
+ #[cfg(debug_assertions)]
for (_, peer) in self.per_peer_state.read().unwrap().iter() {
debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
}
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
}
if !during_init {
- let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
- peer_state, per_peer_state, chan_phase_entry);
- if let Err(e) = res {
- // TODO: This is a *critical* error - we probably updated the outbound edge
- // of the HTLC's monitor with a preimage. We should retry this monitor
- // update over and over again until morale improves.
- log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
- return Err((counterparty_node_id, e));
- }
+ handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+ peer_state, per_peer_state, chan);
} else {
// If we're running during init we cannot update a monitor directly -
// they probably haven't actually been loaded yet. Instead, push the
self.pending_outbound_payments.finalize_claims(sources, &self.pending_events);
}
- fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_outpoint: OutPoint) {
+ fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage,
+ forwarded_htlc_value_msat: Option<u64>, from_onchain: bool,
+ next_channel_counterparty_node_id: Option<PublicKey>, next_channel_outpoint: OutPoint
+ ) {
match source {
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
"We don't support claim_htlc claims during startup - monitors may not be available yet");
+ if let Some(pubkey) = next_channel_counterparty_node_id {
+ debug_assert_eq!(pubkey, path.hops[0].pubkey);
+ }
let ev_completion_action = EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
channel_funding_outpoint: next_channel_outpoint,
counterparty_node_id: path.hops[0].pubkey,
},
HTLCSource::PreviousHopData(hop_data) => {
let prev_outpoint = hop_data.outpoint;
+ let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
let res = self.claim_funds_from_hop(hop_data, payment_preimage,
|htlc_claim_value_msat| {
if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
next_channel_id: Some(next_channel_outpoint.to_channel_id()),
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
},
- downstream_counterparty_and_funding_outpoint: None,
+ downstream_counterparty_and_funding_outpoint:
+ if let Some(node_id) = next_channel_counterparty_node_id {
+ Some((node_id, next_channel_outpoint, completed_blocker))
+ } else {
+ // We can only get `None` here if we are processing a
+ // `ChannelMonitor`-originated event, in which case we
+ // don't care about ensuring we wake the downstream
+ // channel's monitor updating - the channel is already
+ // closed.
+ None
+ },
})
} else { None }
});
Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
},
hash_map::Entry::Vacant(e) => {
- match self.id_to_peer.lock().unwrap().entry(chan.context.channel_id()) {
+ let mut id_to_peer_lock = self.id_to_peer.lock().unwrap();
+ match id_to_peer_lock.entry(chan.context.channel_id()) {
hash_map::Entry::Occupied(_) => {
return Err(MsgHandleErrInternal::send_err_msg_no_close(
"The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
funding_msg.channel_id))
},
hash_map::Entry::Vacant(i_e) => {
- i_e.insert(chan.context.get_counterparty_node_id());
- }
- }
-
- // There's no problem signing a counterparty's funding transaction if our monitor
- // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
- // accepted payment from yet. We do, however, need to wait to send our channel_ready
- // until we have persisted our monitor.
- let new_channel_id = funding_msg.channel_id;
- peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
- node_id: counterparty_node_id.clone(),
- msg: funding_msg,
- });
+ let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
+ if let Ok(persist_state) = monitor_res {
+ i_e.insert(chan.context.get_counterparty_node_id());
+ mem::drop(id_to_peer_lock);
+
+ // There's no problem signing a counterparty's funding transaction if our monitor
+ // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
+ // accepted payment from yet. We do, however, need to wait to send our channel_ready
+ // until we have persisted our monitor.
+ peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
+ node_id: counterparty_node_id.clone(),
+ msg: funding_msg,
+ });
- let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
-
- if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) {
- let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state,
- per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR,
- { peer_state.channel_by_id.remove(&new_channel_id) });
-
- // Note that we reply with the new channel_id in error messages if we gave up on the
- // channel, not the temporary_channel_id. This is compatible with ourselves, but the
- // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
- // any messages referencing a previously-closed channel anyway.
- // We do not propagate the monitor update to the user as it would be for a monitor
- // that we didn't manage to store (and that we don't care about - we don't respond
- // with the funding_signed so the channel can never go on chain).
- if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res {
- res.0 = None;
+ if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) {
+ handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state,
+ per_peer_state, chan, INITIAL_MONITOR);
+ } else {
+ unreachable!("This must be a funded channel as we just inserted it.");
+ }
+ Ok(())
+ } else {
+ log_error!(self.logger, "Persisting initial ChannelMonitor failed, implying the funding outpoint was duplicated");
+ return Err(MsgHandleErrInternal::send_err_msg_no_close(
+ "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
+ funding_msg.channel_id));
+ }
}
- res.map(|_| ())
- } else {
- unreachable!("This must be a funded channel as we just inserted it.");
}
}
}
ChannelPhase::Funded(ref mut chan) => {
let monitor = try_chan_phase_entry!(self,
chan.funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan_phase_entry);
- let update_res = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor);
- let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan_phase_entry, INITIAL_MONITOR);
- if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
- // We weren't able to watch the channel to begin with, so no updates should be made on
- // it. Previously, full_stack_target found an (unreachable) panic when the
- // monitor update contained within `shutdown_finish` was applied.
- if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
- shutdown_finish.0.take();
- }
+ if let Ok(persist_status) = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor) {
+ handle_new_monitor_update!(self, persist_status, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR);
+ Ok(())
+ } else {
+ try_chan_phase_entry!(self, Err(ChannelError::Close("Channel funding outpoint was a duplicate".to_owned())), chan_phase_entry)
}
- res.map(|_| ())
},
_ => {
return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id));
}
fn internal_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> {
- let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>;
- let result: Result<(), _> = loop {
+ let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)> = Vec::new();
+ let mut finish_shutdown = None;
+ {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
}
// Update the monitor with the shutdown script if necessary.
if let Some(monitor_update) = monitor_update_opt {
- break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
- peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ());
+ handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+ peer_state_lock, peer_state, per_peer_state, chan);
}
- break Ok(());
},
ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV1(_) => {
let context = phase.context_mut();
log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id);
self.issue_channel_close_events(&context, ClosureReason::CounterpartyCoopClosedUnfundedChannel);
let mut chan = remove_channel_phase!(self, chan_phase_entry);
- self.finish_force_close_channel(chan.context_mut().force_shutdown(false));
- return Ok(());
+ finish_shutdown = Some(chan.context_mut().force_shutdown(false));
},
}
} else {
return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
}
- };
+ }
for htlc_source in dropped_htlcs.drain(..) {
let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id.clone()), channel_id: msg.channel_id };
let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
}
+ if let Some(shutdown_res) = finish_shutdown {
+ self.finish_close_channel(shutdown_res);
+ }
- result
+ Ok(())
}
fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
+ let mut shutdown_result = None;
+ let unbroadcasted_batch_funding_txid;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
hash_map::Entry::Occupied(mut chan_phase_entry) => {
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
+ unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
if let Some(msg) = closing_signed {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
});
}
self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure);
+ shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
+ }
+ mem::drop(per_peer_state);
+ if let Some(shutdown_result) = shutdown_result {
+ self.finish_close_channel(shutdown_result);
}
Ok(())
}
hash_map::Entry::Occupied(mut chan_phase_entry) => {
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry);
+ if let HTLCSource::PreviousHopData(prev_hop) = &res.0 {
+ peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id)
+ .or_insert_with(Vec::new)
+ .push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop));
+ }
+ // Note that we do not need to push an `actions_blocking_raa_monitor_updates`
+ // entry here, even though we *do* need to block the next RAA monitor update.
+ // We do this instead in the `claim_funds_internal` by attaching a
+ // `ReleaseRAAChannelMonitorUpdate` action to the event generated when the
+ // outbound HTLC is claimed. This is guaranteed to all complete before we
+ // process the RAA as messages are processed from single peers serially.
funding_txo = chan.context.get_funding_txo().expect("We won't accept a fulfill until funded");
res
} else {
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
}
};
- self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, funding_txo);
+ self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo);
Ok(())
}
let monitor_update_opt = try_chan_phase_entry!(self, chan.commitment_signed(&msg, &self.logger), chan_phase_entry);
if let Some(monitor_update) = monitor_update_opt {
handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock,
- peer_state, per_peer_state, chan_phase_entry).map(|_| ())
- } else { Ok(()) }
+ peer_state, per_peer_state, chan);
+ }
+ Ok(())
} else {
return try_chan_phase_entry!(self, Err(ChannelError::Close(
"Got a commitment_signed message for an unfunded channel!".into())), chan_phase_entry);
})
}
+ #[cfg(any(test, feature = "_test_utils"))]
+ pub(crate) fn test_raa_monitor_updates_held(&self,
+ counterparty_node_id: PublicKey, channel_id: ChannelId
+ ) -> bool {
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
+ let mut peer_state_lck = peer_state_mtx.lock().unwrap();
+ let peer_state = &mut *peer_state_lck;
+
+ if let Some(chan) = peer_state.channel_by_id.get(&channel_id) {
+ return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
+ chan.context().get_funding_txo().unwrap(), counterparty_node_id);
+ }
+ }
+ false
+ }
+
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
- let (htlcs_to_fail, res) = {
+ let htlcs_to_fail = {
let per_peer_state = self.per_peer_state.read().unwrap();
let mut peer_state_lock = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
} else { false };
let (htlcs_to_fail, monitor_update_opt) = try_chan_phase_entry!(self,
chan.revoke_and_ack(&msg, &self.fee_estimator, &self.logger, mon_update_blocked), chan_phase_entry);
- let res = if let Some(monitor_update) = monitor_update_opt {
+ if let Some(monitor_update) = monitor_update_opt {
let funding_txo = funding_txo_opt
.expect("Funding outpoint must have been set for RAA handling to succeed");
handle_new_monitor_update!(self, funding_txo, monitor_update,
- peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ())
- } else { Ok(()) };
- (htlcs_to_fail, res)
+ peer_state_lock, peer_state, per_peer_state, chan);
+ }
+ htlcs_to_fail
} else {
return try_chan_phase_entry!(self, Err(ChannelError::Close(
"Got a revoke_and_ack message for an unfunded channel!".into())), chan_phase_entry);
}
};
self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id);
- res
+ Ok(())
}
fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> {
if were_node_one == msg_from_node_one {
return Ok(NotifyOption::SkipPersistNoEvents);
} else {
- log_debug!(self.logger, "Received channel_update for channel {}.", chan_id);
- try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry);
+ log_debug!(self.logger, "Received channel_update {:?} for channel {}.", msg, chan_id);
+ let did_change = try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry);
+ // If nothing changed after applying their update, we don't need to bother
+ // persisting.
+ if !did_change {
+ return Ok(NotifyOption::SkipPersistNoEvents);
+ }
}
} else {
return try_chan_phase_entry!(self, Err(ChannelError::Close(
match monitor_event {
MonitorEvent::HTLCEvent(htlc_update) => {
if let Some(preimage) = htlc_update.payment_preimage {
- log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", &preimage);
- self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint);
+ log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage);
+ self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint);
} else {
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash);
let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() };
self.fail_htlc_backwards_internal(&htlc_update.source, &htlc_update.payment_hash, &reason, receiver);
}
},
- MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
- MonitorEvent::UpdateFailed(funding_outpoint) => {
+ MonitorEvent::HolderForceClosed(funding_outpoint) => {
let counterparty_node_id_opt = match counterparty_node_id {
Some(cp_id) => Some(cp_id),
None => {
msg: update
});
}
- let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
- ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
- } else {
- ClosureReason::CommitmentTxConfirmed
- };
- self.issue_channel_close_events(&chan.context, reason);
+ self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
pending_msg_events.push(events::MessageSendEvent::HandleError {
node_id: chan.context.get_counterparty_node_id(),
action: msgs::ErrorAction::SendErrorMessage {
}
for failure in failed_channels.drain(..) {
- self.finish_force_close_channel(failure);
+ self.finish_close_channel(failure);
}
has_pending_monitor_events
fn check_free_holding_cells(&self) -> bool {
let mut has_monitor_update = false;
let mut failed_htlcs = Vec::new();
- let mut handle_errors = Vec::new();
// Walk our list of channels and find any that need to update. Note that when we do find an
// update, if it includes actions that must be taken afterwards, we have to drop the
if let Some(monitor_update) = monitor_opt {
has_monitor_update = true;
- let channel_id: ChannelId = *channel_id;
- let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
- peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING,
- peer_state.channel_by_id.remove(&channel_id));
- if res.is_err() {
- handle_errors.push((counterparty_node_id, res));
- }
+ handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
+ peer_state_lock, peer_state, per_peer_state, chan);
continue 'peer_loop;
}
}
break 'peer_loop;
}
- let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();
+ let has_update = has_monitor_update || !failed_htlcs.is_empty();
for (failures, channel_id, counterparty_node_id) in failed_htlcs.drain(..) {
self.fail_holding_cell_htlcs(failures, channel_id, &counterparty_node_id);
}
- for (counterparty_node_id, err) in handle_errors.drain(..) {
- let _ = handle_error!(self, err, counterparty_node_id);
- }
-
has_update
}
fn maybe_generate_initial_closing_signed(&self) -> bool {
let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new();
let mut has_update = false;
+ let mut shutdown_result = None;
+ let mut unbroadcasted_batch_funding_txid = None;
{
let per_peer_state = self.per_peer_state.read().unwrap();
peer_state.channel_by_id.retain(|channel_id, phase| {
match phase {
ChannelPhase::Funded(chan) => {
+ unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
Ok((msg_opt, tx_opt)) => {
if let Some(msg) = msg_opt {
log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
self.tx_broadcaster.broadcast_transactions(&[&tx]);
update_maps_on_chan_removal!(self, &chan.context);
+ shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
false
} else { true }
},
let _ = handle_error!(self, err, counterparty_node_id);
}
+ if let Some(shutdown_result) = shutdown_result {
+ self.finish_close_channel(shutdown_result);
+ }
+
has_update
}
counterparty_node_id, funding_txo, update
});
}
- self.finish_force_close_channel(failure);
+ self.finish_close_channel(failure);
}
}
/// operation. It will double-check that nothing *else* is also blocking the same channel from
/// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
- let mut errors = Vec::new();
loop {
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() {
log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor",
channel_funding_outpoint.to_channel_id());
- if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update,
- peer_state_lck, peer_state, per_peer_state, chan_phase_entry)
- {
- errors.push((e, counterparty_node_id));
- }
+ handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update,
+ peer_state_lck, peer_state, per_peer_state, chan);
if further_update_exists {
// If there are more `ChannelMonitorUpdate`s to process, restart at the
// top of the loop.
}
break;
}
- for (err, counterparty_node_id) in errors {
- let res = Err::<(), _>(err);
- let _ = handle_error!(self, res, counterparty_node_id);
- }
}
fn handle_post_event_actions(&self, actions: Vec<EventCompletionAction>) {
fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(
self, || NotifyOption::SkipPersistHandleEvents);
-
let mut failed_channels = Vec::new();
let mut per_peer_state = self.per_peer_state.write().unwrap();
let remove_peer = {
peer_state.channel_by_id.retain(|_, phase| {
let context = match phase {
ChannelPhase::Funded(chan) => {
- chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
- // We only retain funded channels that are not shutdown.
- if !chan.is_shutdown() {
+ if chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger).is_ok() {
+ // We only retain funded channels that are not shutdown.
return true;
}
- &chan.context
+ &mut chan.context
},
// Unfunded channels will always be removed.
ChannelPhase::UnfundedOutboundV1(chan) => {
- &chan.context
+ &mut chan.context
},
ChannelPhase::UnfundedInboundV1(chan) => {
- &chan.context
+ &mut chan.context
},
};
// Clean up for removal.
update_maps_on_chan_removal!(self, &context);
self.issue_channel_close_events(&context, ClosureReason::DisconnectedPeer);
+ failed_channels.push(context.force_shutdown(false));
false
});
// Note that we don't bother generating any events for pre-accept channels -
mem::drop(per_peer_state);
for failure in failed_channels.drain(..) {
- self.finish_force_close_channel(failure);
+ self.finish_close_channel(failure);
}
}
(10, self.channel_value_satoshis, required),
(12, self.unspendable_punishment_reserve, option),
(14, user_channel_id_low, required),
- (16, self.next_outbound_htlc_limit_msat, required), // Forwards compatibility for removed balance_msat field.
+ (16, self.balance_msat, required),
(18, self.outbound_capacity_msat, required),
(19, self.next_outbound_htlc_limit_msat, required),
(20, self.inbound_capacity_msat, required),
(10, channel_value_satoshis, required),
(12, unspendable_punishment_reserve, option),
(14, user_channel_id_low, required),
- (16, _balance_msat, option), // Backwards compatibility for removed balance_msat field.
+ (16, balance_msat, required),
(18, outbound_capacity_msat, required),
// Note that by the time we get past the required read above, outbound_capacity_msat will be
// filled in, so we can safely unwrap it here.
let user_channel_id = user_channel_id_low as u128 +
((user_channel_id_high_opt.unwrap_or(0 as u64) as u128) << 64);
- let _balance_msat: Option<u64> = _balance_msat;
-
Ok(Self {
inbound_scid_alias,
channel_id: channel_id.0.unwrap(),
channel_value_satoshis: channel_value_satoshis.0.unwrap(),
unspendable_punishment_reserve,
user_channel_id,
+ balance_msat: balance_msat.0.unwrap(),
outbound_capacity_msat: outbound_capacity_msat.0.unwrap(),
next_outbound_htlc_limit_msat: next_outbound_htlc_limit_msat.0.unwrap(),
next_outbound_htlc_minimum_msat: next_outbound_htlc_minimum_msat.0.unwrap(),
}
number_of_funded_channels += peer_state.channel_by_id.iter().filter(
- |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_initiated() } else { false }
+ |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_broadcast() } else { false }
).count();
}
let peer_state = &mut *peer_state_lock;
for channel in peer_state.channel_by_id.iter().filter_map(
|(_, phase)| if let ChannelPhase::Funded(channel) = phase {
- if channel.context.is_funding_initiated() { Some(channel) } else { None }
+ if channel.context.is_funding_broadcast() { Some(channel) } else { None }
} else { None }
) {
channel.write(writer)?;
log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
&channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
}
- let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true);
+ let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true);
+ if batch_funding_txid.is_some() {
+ return Err(DecodeError::InvalidValue);
+ }
if let Some((counterparty_node_id, funding_txo, update)) = monitor_update {
close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id, funding_txo, update
if let Some(short_channel_id) = channel.context.get_short_channel_id() {
short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
}
- if channel.context.is_funding_initiated() {
+ if channel.context.is_funding_broadcast() {
id_to_peer.insert(channel.context.channel_id(), channel.context.get_counterparty_node_id());
}
match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) {
pending_fee_msat: Some(path_fee),
total_msat: path_amt,
starting_block_height: best_block_height,
+ remaining_max_total_routing_fee_msat: None, // only used for retries, and we'll never retry on startup
});
log_info!(args.logger, "Added a pending payment for {} msat with payment hash {} for path with session priv {}",
path_amt, &htlc.payment_hash, log_bytes!(session_priv_bytes));
// downstream chan is closed (because we don't have a
// channel_id -> peer map entry).
counterparty_opt.is_none(),
+ counterparty_opt.cloned().or(monitor.get_counterparty_node_id()),
monitor.get_funding_txo().0))
} else { None }
} else {
event_persist_notifier: Notifier::new(),
needs_persist_flag: AtomicBool::new(false),
+ funding_batch_states: Mutex::new(BTreeMap::new()),
+
entropy_source: args.entropy_source,
node_signer: args.node_signer,
signer_provider: args.signer_provider,
channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
}
- for (source, preimage, downstream_value, downstream_closed, downstream_funding) in pending_claims_to_replay {
+ for (source, preimage, downstream_value, downstream_closed, downstream_node_id, downstream_funding) in pending_claims_to_replay {
// We use `downstream_closed` in place of `from_onchain` here just as a guess - we
// don't remember in the `ChannelMonitor` where we got a preimage from, but if the
// channel is closed we just assume that it probably came from an on-chain claim.
channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
- downstream_closed, downstream_funding);
+ downstream_closed, downstream_node_id, downstream_funding);
}
//TODO: Broadcast channel update for closed channels, but only after we've made a
// To start (1), send a regular payment but don't claim it.
let expected_route = [&nodes[1]];
- let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &expected_route, 100_000);
+ let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &expected_route, 100_000);
// Next, attempt a keysend payment and make sure it fails.
let route_params = RouteParameters::from_payment_params_and_value(
TEST_FINAL_CLTV, false), 100_000);
let route = find_route(
&nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
- None, nodes[0].logger, &scorer, &(), &random_seed_bytes
+ None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap();
let payment_preimage = PaymentPreimage([42; 32]);
let route = find_route(
&nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
- None, nodes[0].logger, &scorer, &(), &random_seed_bytes
+ None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap();
);
let route = find_route(
&nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
- None, nodes[0].logger, &scorer, &(), &random_seed_bytes
+ None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
let payment_id_2 = PaymentId([45; 32]);
nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
let route = find_route(
&payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
- nodes[0].logger, &scorer, &(), &random_seed_bytes
+ nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
let test_preimage = PaymentPreimage([42; 32]);
let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
let route = find_route(
&payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
- nodes[0].logger, &scorer, &(), &random_seed_bytes
+ nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
let test_preimage = PaymentPreimage([42; 32]);