use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel};
use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures};
#[cfg(any(feature = "_test_utils", test))]
-use crate::ln::features::InvoiceFeatures;
+use crate::ln::features::Bolt11InvoiceFeatures;
use crate::routing::gossip::NetworkGraph;
-use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router};
+use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteParameters, Router};
use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
use crate::ln::msgs;
use crate::ln::onion_utils;
/// running normally, and specifically must be processed before any other non-background
/// [`ChannelMonitorUpdate`]s are applied.
enum BackgroundEvent {
- /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from
- /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public
- /// key to handle channel resumption, whereas if the channel has been force-closed we do not
- /// need the counterparty node_id.
+ /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
+ /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
+ /// maybe-non-closing variant needs a public key to handle channel resumption, whereas if the
+ /// channel has been force-closed we do not need the counterparty node_id.
///
/// Note that any such events are lost on shutdown, so in general they must be updates which
/// are regenerated on startup.
- ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
+ ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
/// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the
/// channel to continue normal operation.
///
/// In general this should be used rather than
- /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the
+ /// [`Self::ClosedMonitorUpdateRegeneratedOnStartup`], however in cases where the
/// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`]
/// error the other variant is acceptable.
///
funding_txo: OutPoint,
update: ChannelMonitorUpdate
},
+ /// Some [`ChannelMonitorUpdate`] (s) completed before we were serialized but we still have
+ /// them marked pending, thus we need to run any [`MonitorUpdateCompletionAction`] (s) pending
+ /// on a channel.
+ MonitorUpdatesComplete {
+ counterparty_node_id: PublicKey,
+ channel_id: [u8; 32],
+ },
}
#[derive(Debug)]
&& self.in_flight_monitor_updates.is_empty()
}
- // Returns a count of all channels we have with this peer, including pending channels.
+ // Returns a count of all channels we have with this peer, including unfunded channels.
fn total_channel_count(&self) -> usize {
self.channel_by_id.len() +
self.outbound_v1_channel_by_id.len() +
/// Notifier the lock contains sends out a notification when the lock is released.
total_consistency_lock: RwLock<()>,
- #[cfg(debug_assertions)]
background_events_processed_since_startup: AtomicBool,
persistence_notifier: Notifier,
},
}
};
- ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, PREFUNDED) => {
+ ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, UNFUNDED) => {
match $err {
- // We should only ever have `ChannelError::Close` when prefunded channels error.
+ // We should only ever have `ChannelError::Close` when unfunded channels error.
// In any case, just close the channel.
ChannelError::Warn(msg) | ChannelError::Ignore(msg) | ChannelError::Close(msg) => {
- log_error!($self.logger, "Closing prefunded channel {} due to an error: {}", log_bytes!($channel_id[..]), msg);
+ log_error!($self.logger, "Closing unfunded channel {} due to an error: {}", log_bytes!($channel_id[..]), msg);
update_maps_on_chan_removal!($self, &$channel_context);
let shutdown_res = $channel_context.force_shutdown(false);
(true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel_context.get_user_id(),
match $res {
Ok(res) => res,
Err(e) => {
- let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), PREFUNDED);
+ let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), UNFUNDED);
if drop {
$entry.remove_entry();
}
// update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
// any case so that it won't deadlock.
debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
- #[cfg(debug_assertions)] {
- debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
- }
+ debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
match $update_res {
ChannelMonitorUpdateStatus::InProgress => {
log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
let mut pending_events = $self.pending_events.lock().unwrap();
pending_events.drain(..num_events);
processed_all_events = pending_events.is_empty();
+ // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
+ // updated here with the `pending_events` lock acquired.
$self.pending_events_processor.store(false, Ordering::Release);
}
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
- #[cfg(debug_assertions)]
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),
self.issue_channel_close_events(&chan.get().context, closure_reason);
let mut chan = remove_channel!(self, chan);
self.finish_force_close_channel(chan.context.force_shutdown(false));
- // Prefunded channel has no update
+ // Unfunded channel has no update
(None, chan.context.get_counterparty_node_id())
} else if let hash_map::Entry::Occupied(chan) = peer_state.inbound_v1_channel_by_id.entry(channel_id.clone()) {
log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..]));
self.issue_channel_close_events(&chan.get().context, closure_reason);
let mut chan = remove_channel!(self, chan);
self.finish_force_close_channel(chan.context.force_shutdown(false));
- // Prefunded channel has no update
+ // Unfunded channel has no update
(None, chan.context.get_counterparty_node_id())
} else {
return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), peer_node_id) });
/// irrevocably committed to on our end. In such a case, do NOT retry the payment with a
/// different route unless you intend to pay twice!
///
+ /// [`RouteHop`]: crate::routing::router::RouteHop
/// [`Event::PaymentSent`]: events::Event::PaymentSent
/// [`Event::PaymentFailed`]: events::Event::PaymentFailed
/// [`UpdateHTLCs`]: events::MessageSendEvent::UpdateHTLCs
fn process_background_events(&self) -> NotifyOption {
debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread);
- #[cfg(debug_assertions)]
self.background_events_processed_since_startup.store(true, Ordering::Release);
let mut background_events = Vec::new();
for event in background_events.drain(..) {
match event {
- BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
+ BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
// The channel has already been closed, so no use bothering to care about the
// monitor updating completing.
let _ = self.chain_monitor.update_channel(funding_txo, &update);
}
let _ = handle_error!(self, res, counterparty_node_id);
},
+ BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => {
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+ let peer_state = &mut *peer_state_lock;
+ if let Some(chan) = peer_state.channel_by_id.get_mut(&channel_id) {
+ handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, chan);
+ } else {
+ let update_actions = peer_state.monitor_update_blocked_actions
+ .remove(&channel_id).unwrap_or(Vec::new());
+ mem::drop(peer_state_lock);
+ mem::drop(per_peer_state);
+ self.handle_monitor_update_completion_actions(update_actions);
+ }
+ }
+ },
}
}
NotifyOption::DoPersist
PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
let mut should_persist = self.process_background_events();
- let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+ let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+ let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
let per_peer_state = self.per_peer_state.read().unwrap();
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
for (chan_id, chan) in peer_state.channel_by_id.iter_mut() {
+ let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
+ min_mempool_feerate
+ } else {
+ normal_feerate
+ };
let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
}
PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
let mut should_persist = self.process_background_events();
- let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+ let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+ let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
let mut timed_out_mpp_htlcs = Vec::new();
let pending_msg_events = &mut peer_state.pending_msg_events;
let counterparty_node_id = *counterparty_node_id;
peer_state.channel_by_id.retain(|chan_id, chan| {
+ let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
+ min_mempool_feerate
+ } else {
+ normal_feerate
+ };
let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
-> Result<(), (PublicKey, MsgHandleErrInternal)> {
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
+ // If we haven't yet run background events assume we're still deserializing and shouldn't
+ // actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as
+ // `BackgroundEvent`s.
+ let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
+
{
let per_peer_state = self.per_peer_state.read().unwrap();
let chan_id = prev_hop.outpoint.to_channel_id();
log_bytes!(chan_id), action);
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
}
- let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
- peer_state, per_peer_state, chan);
- if let Err(e) = res {
- // TODO: This is a *critical* error - we probably updated the outbound edge
- // of the HTLC's monitor with a preimage. We should retry this monitor
- // update over and over again until morale improves.
- log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
- return Err((counterparty_node_id, e));
+ if !during_init {
+ let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+ peer_state, per_peer_state, chan);
+ if let Err(e) = res {
+ // TODO: This is a *critical* error - we probably updated the outbound edge
+ // of the HTLC's monitor with a preimage. We should retry this monitor
+ // update over and over again until morale improves.
+ log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
+ return Err((counterparty_node_id, e));
+ }
+ } else {
+ // If we're running during init we cannot update a monitor directly -
+ // they probably haven't actually been loaded yet. Instead, push the
+ // monitor update as a background event.
+ self.pending_background_events.lock().unwrap().push(
+ BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+ counterparty_node_id,
+ funding_txo: prev_hop.outpoint,
+ update: monitor_update.clone(),
+ });
}
}
return Ok(());
payment_preimage,
}],
};
- // We update the ChannelMonitor on the backward link, after
- // receiving an `update_fulfill_htlc` from the forward link.
- let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
- if update_res != ChannelMonitorUpdateStatus::Completed {
- // TODO: This needs to be handled somehow - if we receive a monitor update
- // with a preimage we *must* somehow manage to propagate it to the upstream
- // channel, or we must have an ability to receive the same event and try
- // again on restart.
- log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
- payment_preimage, update_res);
+
+ if !during_init {
+ // We update the ChannelMonitor on the backward link, after
+ // receiving an `update_fulfill_htlc` from the forward link.
+ let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
+ if update_res != ChannelMonitorUpdateStatus::Completed {
+ // TODO: This needs to be handled somehow - if we receive a monitor update
+ // with a preimage we *must* somehow manage to propagate it to the upstream
+ // channel, or we must have an ability to receive the same event and try
+ // again on restart.
+ log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
+ payment_preimage, update_res);
+ }
+ } else {
+ // If we're running during init we cannot update a monitor directly - they probably
+ // haven't actually been loaded yet. Instead, push the monitor update as a background
+ // event.
+ // Note that while it's safe to use `ClosedMonitorUpdateRegeneratedOnStartup` here (the
+ // channel is already closed) we need to ultimately handle the monitor update
+ // completion action only after we've completed the monitor update. This is the only
+ // way to guarantee this update *will* be regenerated on startup (otherwise if this was
+ // from a forwarded HTLC the downstream preimage may be deleted before we claim
+ // upstream). Thus, we need to transition to some new `BackgroundEvent` type which will
+ // complete the monitor update completion action from `completion_action`.
+ self.pending_background_events.lock().unwrap().push(
+ BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
+ prev_hop.outpoint, preimage_update,
+ )));
}
// Note that we do process the completion action here. This totally could be a
// duplicate claim, but we have no way of knowing without interrogating the
fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
match source {
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
+ debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
+ "We don't support claim_htlc claims during startup - monitors may not be available yet");
self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger);
},
HTLCSource::PreviousHopData(hop_data) => {
if peer_state_mutex_opt.is_none() { return }
peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
let peer_state = &mut *peer_state_lock;
- let mut channel = {
- match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
- hash_map::Entry::Occupied(chan) => chan,
- hash_map::Entry::Vacant(_) => return,
- }
- };
+ let channel =
+ if let Some(chan) = peer_state.channel_by_id.get_mut(&funding_txo.to_channel_id()) {
+ chan
+ } else {
+ let update_actions = peer_state.monitor_update_blocked_actions
+ .remove(&funding_txo.to_channel_id()).unwrap_or(Vec::new());
+ mem::drop(peer_state_lock);
+ mem::drop(per_peer_state);
+ self.handle_monitor_update_completion_actions(update_actions);
+ return;
+ };
let remaining_in_flight =
if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) {
pending.retain(|upd| upd.update_id > highest_applied_update_id);
pending.len()
} else { 0 };
log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.",
- highest_applied_update_id, channel.get().context.get_latest_monitor_update_id(),
+ highest_applied_update_id, channel.context.get_latest_monitor_update_id(),
remaining_in_flight);
- if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id {
+ if !channel.is_awaiting_monitor_update() || channel.context.get_latest_monitor_update_id() != highest_applied_update_id {
return;
}
- handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel.get_mut());
+ handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel);
}
/// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
}
}
- // We only want to push a PendingHTLCsForwardable event if no others are queued.
fn push_pending_forwards_ev(&self) {
let mut pending_events = self.pending_events.lock().unwrap();
- let forward_ev_exists = pending_events.iter()
- .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
- .is_some();
- if !forward_ev_exists {
- pending_events.push_back((events::Event::PendingHTLCsForwardable {
- time_forwardable:
- Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
+ let is_processing_events = self.pending_events_processor.load(Ordering::Acquire);
+ let num_forward_events = pending_events.iter().filter(|(ev, _)|
+ if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }
+ ).count();
+ // We only want to push a PendingHTLCsForwardable event if no others are queued. Processing
+ // events is done in batches and they are not removed until we're done processing each
+ // batch. Since handling a `PendingHTLCsForwardable` event will call back into the
+ // `ChannelManager`, we'll still see the original forwarding event not removed. Phantom
+ // payments will need an additional forwarding event before being claimed to make them look
+ // real by taking more time.
+ if (is_processing_events && num_forward_events <= 1) || num_forward_events < 1 {
+ pending_events.push_back((Event::PendingHTLCsForwardable {
+ time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
}, None));
}
}
/// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
- /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
+ /// [`msgs::RevokeAndACK`] should be held for the given channel until some other action
/// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
/// the [`ChannelMonitorUpdate`] in question.
fn raa_monitor_updates_held(&self,
/// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
/// [`Event`] being handled) completes, this should be called to restore the channel to normal
/// operation. It will double-check that nothing *else* is also blocking the same channel from
- /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
+ /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
let mut errors = Vec::new();
loop {
provided_node_features(&self.default_configuration)
}
- /// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by
+ /// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by
/// [`ChannelManager`].
///
/// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice"
/// or not. Thus, this method is not public.
#[cfg(any(feature = "_test_utils", test))]
- pub fn invoice_features(&self) -> InvoiceFeatures {
+ pub fn invoice_features(&self) -> Bolt11InvoiceFeatures {
provided_invoice_features(&self.default_configuration)
}
let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
let peer_state = &mut *peer_state_lock;
if let Some(chan) = peer_state.outbound_v1_channel_by_id.get_mut(&msg.channel_id) {
- if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash) {
+ if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash, &self.fee_estimator) {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
node_id: *counterparty_node_id,
msg,
provided_init_features(config).to_context()
}
-/// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by
+/// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by
/// [`ChannelManager`].
///
/// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice"
/// or not. Thus, this method is not public.
#[cfg(any(feature = "_test_utils", test))]
-pub(crate) fn provided_invoice_features(config: &UserConfig) -> InvoiceFeatures {
+pub(crate) fn provided_invoice_features(config: &UserConfig) -> Bolt11InvoiceFeatures {
provided_init_features(config).to_context()
}
}
impl_writeable_tlv_based!(PhantomRouteHints, {
- (2, channels, vec_type),
+ (2, channels, required_vec),
(4, phantom_scid, required),
(6, real_node_pubkey, required),
});
0 => {
let mut session_priv: crate::util::ser::RequiredWrapper<SecretKey> = crate::util::ser::RequiredWrapper(None);
let mut first_hop_htlc_msat: u64 = 0;
- let mut path_hops: Option<Vec<RouteHop>> = Some(Vec::new());
+ let mut path_hops = Vec::new();
let mut payment_id = None;
let mut payment_params: Option<PaymentParameters> = None;
let mut blinded_tail: Option<BlindedTail> = None;
(0, session_priv, required),
(1, payment_id, option),
(2, first_hop_htlc_msat, required),
- (4, path_hops, vec_type),
+ (4, path_hops, required_vec),
(5, payment_params, (option: ReadableArgs, 0)),
(6, blinded_tail, option),
});
// instead.
payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref()));
}
- let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)?, blinded_tail };
+ let path = Path { hops: path_hops, blinded_tail };
if path.hops.len() == 0 {
return Err(DecodeError::InvalidValue);
}
(1, payment_id_opt, option),
(2, first_hop_htlc_msat, required),
// 3 was previously used to write a PaymentSecret for the payment.
- (4, path.hops, vec_type),
+ (4, path.hops, required_vec),
(5, None::<PaymentParameters>, option), // payment_params in LDK versions prior to 0.0.115
(6, path.blinded_tail, option),
});
(6, monitor_update_blocked_actions_per_peer, option),
(7, self.fake_scid_rand_bytes, required),
(8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
- (9, htlc_purposes, vec_type),
+ (9, htlc_purposes, required_vec),
(10, in_flight_monitor_updates, option),
(11, self.probing_cookie_secret, required),
(13, htlc_onion_fields, optional_vec),
update_id: CLOSED_CHANNEL_UPDATE_ID,
updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
};
- close_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
+ close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
}
}
(6, monitor_update_blocked_actions_per_peer, option),
(7, fake_scid_rand_bytes, option),
(8, events_override, option),
- (9, claimable_htlc_purposes, vec_type),
+ (9, claimable_htlc_purposes, optional_vec),
(10, in_flight_monitor_updates, option),
(11, probing_cookie_secret, option),
(13, claimable_htlc_onion_fields, optional_vec),
update: update.clone(),
});
}
+ if $chan_in_flight_upds.is_empty() {
+ // We had some updates to apply, but it turns out they had completed before we
+ // were serialized, we just weren't notified of that. Thus, we may have to run
+ // the completion actions for any monitor updates, but otherwise are done.
+ pending_background_events.push(
+ BackgroundEvent::MonitorUpdatesComplete {
+ counterparty_node_id: $counterparty_node_id,
+ channel_id: $funding_txo.to_channel_id(),
+ });
+ }
if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() {
log_error!(args.logger, "Duplicate in-flight monitor update set for the same channel!");
return Err(DecodeError::InvalidValue);
// Note that we have to do the above replays before we push new monitor updates.
pending_background_events.append(&mut close_background_events);
+ // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we
+ // should ensure we try them again on the inbound edge. We put them here and do so after we
+ // have a fully-constructed `ChannelManager` at the end.
+ let mut pending_claims_to_replay = Vec::new();
+
{
// If we're tracking pending payments, ensure we haven't lost any by looking at the
// ChannelMonitor data for any channels for which we do not have authorative state
// We only rebuild the pending payments map if we were most recently serialized by
// 0.0.102+
for (_, monitor) in args.channel_monitors.iter() {
- if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
+ let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id());
+ if counterparty_opt.is_none() {
for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() {
if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source {
if path.hops.is_empty() {
}
}
}
+
+ // Whether the downstream channel was closed or not, try to re-apply any payment
+ // preimages from it which may be needed in upstream channels for forwarded
+ // payments.
+ let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs()
+ .into_iter()
+ .filter_map(|(htlc_source, (htlc, preimage_opt))| {
+ if let HTLCSource::PreviousHopData(_) = htlc_source {
+ if let Some(payment_preimage) = preimage_opt {
+ Some((htlc_source, payment_preimage, htlc.amount_msat,
+ // Check if `counterparty_opt.is_none()` to see if the
+ // downstream chan is closed (because we don't have a
+ // channel_id -> peer map entry).
+ counterparty_opt.is_none(),
+ monitor.get_funding_txo().0.to_channel_id()))
+ } else { None }
+ } else {
+ // If it was an outbound payment, we've handled it above - if a preimage
+ // came in and we persisted the `ChannelManager` we either handled it and
+ // are good to go or the channel force-closed - we don't have to handle the
+ // channel still live case here.
+ None
+ }
+ });
+ for tuple in outbound_claimed_htlcs_iter {
+ pending_claims_to_replay.push(tuple);
+ }
}
}
blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
.entry(blocked_channel_outpoint.to_channel_id())
.or_insert_with(Vec::new).push(blocking_action.clone());
+ } else {
+ // If the channel we were blocking has closed, we don't need to
+ // worry about it - the blocked monitor update should never have
+ // been released from the `Channel` object so it can't have
+ // completed, and if the channel closed there's no reason to bother
+ // anymore.
}
}
}
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
- #[cfg(debug_assertions)]
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),
channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
}
+ for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay {
+ // We use `downstream_closed` in place of `from_onchain` here just as a guess - we
+ // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
+ // channel is closed we just assume that it probably came from an on-chain claim.
+ channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
+ downstream_closed, downstream_chan_id);
+ }
+
//TODO: Broadcast channel update for closed channels, but only after we've made a
//connection or two.