/// usually because we're running pre-full-init. They are handled immediately once we detect we are
/// running normally, and specifically must be processed before any other non-background
/// [`ChannelMonitorUpdate`]s are applied.
+#[derive(Debug)]
enum BackgroundEvent {
/// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
/// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
event: events::Event,
downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
},
+ /// Indicates we should immediately resume the operation of another channel, unless there is
+ /// some other reason why the channel is blocked. In practice this simply means immediately
+ /// removing the [`RAAMonitorUpdateBlockingAction`] provided from the blocking set.
+ ///
+ /// This is usually generated when we've forwarded an HTLC and want to block the outbound edge
+ /// from completing a monitor update which removes the payment preimage until the inbound edge
+ /// completes a monitor update containing the payment preimage. However, we use this variant
+ /// instead of [`Self::EmitEventAndFreeOtherChannel`] when we discover that the claim was in
+ /// fact duplicative and we simply want to resume the outbound edge channel immediately.
+ ///
+ /// This variant should thus never be written to disk, as it is processed inline rather than
+ /// stored for later processing.
+ FreeOtherChannelImmediately {
+ downstream_counterparty_node_id: PublicKey,
+ downstream_funding_outpoint: OutPoint,
+ blocking_action: RAAMonitorUpdateBlockingAction,
+ },
}
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
(0, PaymentClaimed) => { (0, payment_hash, required) },
+ // Note that FreeOtherChannelImmediately should never be written - we were supposed to free
+ // *immediately*. However, for simplicity we implement read/write here.
+ (1, FreeOtherChannelImmediately) => {
+ (0, downstream_counterparty_node_id, required),
+ (2, downstream_funding_outpoint, required),
+ (4, blocking_action, required),
+ },
(2, EmitEventAndFreeOtherChannel) => {
(0, event, upgradable_required),
// LDK prior to 0.0.116 did not have this field as the monitor update application order was
for htlc in sources.drain(..) {
if let Err((pk, err)) = self.claim_funds_from_hop(
htlc.prev_hop, payment_preimage,
- |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
- {
+ |_, definitely_duplicate| {
+ debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
+ Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
+ }
+ ) {
if let msgs::ErrorAction::IgnoreError = err.err.action {
// We got a temporary failure updating monitor, but will claim the
// HTLC when the monitor updating is restored (or on chain).
}
}
- fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
+ fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(&self,
prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
-> Result<(), (PublicKey, MsgHandleErrInternal)> {
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
// `BackgroundEvent`s.
let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
+ // As we may call handle_monitor_update_completion_actions in rather rare cases, check that
+ // the required mutexes are not held before we start.
+ debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
+ debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
+
{
let per_peer_state = self.per_peer_state.read().unwrap();
let chan_id = prev_hop.outpoint.to_channel_id();
let counterparty_node_id = chan.context.get_counterparty_node_id();
let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
- if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
- if let Some(action) = completion_action(Some(htlc_value_msat)) {
- log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
- chan_id, action);
- peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
+ match fulfill_res {
+ UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
+ if let Some(action) = completion_action(Some(htlc_value_msat), false) {
+ log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
+ chan_id, action);
+ peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
+ }
+ if !during_init {
+ handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+ peer_state, per_peer_state, chan);
+ } else {
+ // If we're running during init we cannot update a monitor directly -
+ // they probably haven't actually been loaded yet. Instead, push the
+ // monitor update as a background event.
+ self.pending_background_events.lock().unwrap().push(
+ BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+ counterparty_node_id,
+ funding_txo: prev_hop.outpoint,
+ update: monitor_update.clone(),
+ });
+ }
}
- if !during_init {
- handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
- peer_state, per_peer_state, chan);
- } else {
- // If we're running during init we cannot update a monitor directly -
- // they probably haven't actually been loaded yet. Instead, push the
- // monitor update as a background event.
- self.pending_background_events.lock().unwrap().push(
- BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
- counterparty_node_id,
- funding_txo: prev_hop.outpoint,
- update: monitor_update.clone(),
- });
+ UpdateFulfillCommitFetch::DuplicateClaim {} => {
+ let action = if let Some(action) = completion_action(None, true) {
+ action
+ } else {
+ return Ok(());
+ };
+ mem::drop(peer_state_lock);
+
+ log_trace!(self.logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
+ chan_id, action);
+ let (node_id, funding_outpoint, blocker) =
+ if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+ downstream_counterparty_node_id: node_id,
+ downstream_funding_outpoint: funding_outpoint,
+ blocking_action: blocker,
+ } = action {
+ (node_id, funding_outpoint, blocker)
+ } else {
+ debug_assert!(false,
+ "Duplicate claims should always free another channel immediately");
+ return Ok(());
+ };
+ if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
+ let mut peer_state = peer_state_mtx.lock().unwrap();
+ if let Some(blockers) = peer_state
+ .actions_blocking_raa_monitor_updates
+ .get_mut(&funding_outpoint.to_channel_id())
+ {
+ let mut found_blocker = false;
+ blockers.retain(|iter| {
+ // Note that we could actually be blocked, in
+ // which case we need to only remove the one
+ // blocker which was added duplicatively.
+ let first_blocker = !found_blocker;
+ if *iter == blocker { found_blocker = true; }
+ *iter != blocker || !first_blocker
+ });
+ debug_assert!(found_blocker);
+ }
+ } else {
+ debug_assert!(false);
+ }
}
}
}
// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
// generally always allowed to be duplicative (and it's specifically noted in
// `PaymentForwarded`).
- self.handle_monitor_update_completion_actions(completion_action(None));
+ self.handle_monitor_update_completion_actions(completion_action(None, false));
Ok(())
}
}
fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage,
- forwarded_htlc_value_msat: Option<u64>, from_onchain: bool,
+ forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, startup_replay: bool,
next_channel_counterparty_node_id: Option<PublicKey>, next_channel_outpoint: OutPoint
) {
match source {
HTLCSource::PreviousHopData(hop_data) => {
let prev_outpoint = hop_data.outpoint;
let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
+ #[cfg(debug_assertions)]
+ let claiming_chan_funding_outpoint = hop_data.outpoint;
let res = self.claim_funds_from_hop(hop_data, payment_preimage,
- |htlc_claim_value_msat| {
- if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
- let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
- Some(claimed_htlc_value - forwarded_htlc_value)
- } else { None };
+ |htlc_claim_value_msat, definitely_duplicate| {
+ let chan_to_release =
+ if let Some(node_id) = next_channel_counterparty_node_id {
+ Some((node_id, next_channel_outpoint, completed_blocker))
+ } else {
+ // We can only get `None` here if we are processing a
+ // `ChannelMonitor`-originated event, in which case we
+ // don't care about ensuring we wake the downstream
+ // channel's monitor updating - the channel is already
+ // closed.
+ None
+ };
+ if definitely_duplicate && startup_replay {
+ // On startup we may get redundant claims which are related to
+ // monitor updates still in flight. In that case, we shouldn't
+ // immediately free, but instead let that monitor update complete
+ // in the background.
+ #[cfg(debug_assertions)] {
+ let background_events = self.pending_background_events.lock().unwrap();
+ // There should be a `BackgroundEvent` pending...
+ assert!(background_events.iter().any(|ev| {
+ match ev {
+ // to apply a monitor update that blocked the claiming channel,
+ BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+ funding_txo, update, ..
+ } => {
+ if *funding_txo == claiming_chan_funding_outpoint {
+ assert!(update.updates.iter().any(|upd|
+ if let ChannelMonitorUpdateStep::PaymentPreimage {
+ payment_preimage: update_preimage
+ } = upd {
+ payment_preimage == *update_preimage
+ } else { false }
+ ), "{:?}", update);
+ true
+ } else { false }
+ },
+ // or the channel we'd unblock is already closed,
+ BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(
+ (funding_txo, monitor_update)
+ ) => {
+ if *funding_txo == next_channel_outpoint {
+ assert_eq!(monitor_update.updates.len(), 1);
+ assert!(matches!(
+ monitor_update.updates[0],
+ ChannelMonitorUpdateStep::ChannelForceClosed { .. }
+ ));
+ true
+ } else { false }
+ },
+ // or the monitor update has completed and will unblock
+ // immediately once we get going.
+ BackgroundEvent::MonitorUpdatesComplete {
+ channel_id, ..
+ } =>
+ *channel_id == claiming_chan_funding_outpoint.to_channel_id(),
+ }
+ }), "{:?}", *background_events);
+ }
+ None
+ } else if definitely_duplicate {
+ if let Some(other_chan) = chan_to_release {
+ Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+ downstream_counterparty_node_id: other_chan.0,
+ downstream_funding_outpoint: other_chan.1,
+ blocking_action: other_chan.2,
+ })
+ } else { None }
+ } else {
+ let fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
+ if let Some(claimed_htlc_value) = htlc_claim_value_msat {
+ Some(claimed_htlc_value - forwarded_htlc_value)
+ } else { None }
+ } else { None };
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
event: events::Event::PaymentForwarded {
fee_earned_msat,
next_channel_id: Some(next_channel_outpoint.to_channel_id()),
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
},
- downstream_counterparty_and_funding_outpoint:
- if let Some(node_id) = next_channel_counterparty_node_id {
- Some((node_id, next_channel_outpoint, completed_blocker))
- } else {
- // We can only get `None` here if we are processing a
- // `ChannelMonitor`-originated event, in which case we
- // don't care about ensuring we wake the downstream
- // channel's monitor updating - the channel is already
- // closed.
- None
- },
+ downstream_counterparty_and_funding_outpoint: chan_to_release,
})
- } else { None }
+ }
});
if let Err((pk, err)) = res {
let result: Result<(), _> = Err(err);
}
fn handle_monitor_update_completion_actions<I: IntoIterator<Item=MonitorUpdateCompletionAction>>(&self, actions: I) {
+ debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
+ debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
+ debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
+
for action in actions.into_iter() {
match action {
MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
}
},
+ MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+ downstream_counterparty_node_id, downstream_funding_outpoint, blocking_action,
+ } => {
+ self.handle_monitor_update_release(
+ downstream_counterparty_node_id,
+ downstream_funding_outpoint,
+ Some(blocking_action),
+ );
+ },
}
}
}
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry);
if let HTLCSource::PreviousHopData(prev_hop) = &res.0 {
+ log_trace!(self.logger,
+ "Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor",
+ msg.channel_id);
peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id)
.or_insert_with(Vec::new)
.push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop));
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
}
};
- self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo);
+ self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, false, Some(*counterparty_node_id), funding_txo);
Ok(())
}
MonitorEvent::HTLCEvent(htlc_update) => {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage);
- self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint);
+ self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, false, counterparty_node_id, funding_outpoint);
} else {
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash);
let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() };
Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
} = action {
if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
+ log_trace!(args.logger,
+ "Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor",
+ blocked_channel_outpoint.to_channel_id());
blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
.entry(blocked_channel_outpoint.to_channel_id())
.or_insert_with(Vec::new).push(blocking_action.clone());
// anymore.
}
}
+ if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately { .. } = action {
+ debug_assert!(false, "Non-event-generating channel freeing should not appear in our queue");
+ }
}
}
peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
// don't remember in the `ChannelMonitor` where we got a preimage from, but if the
// channel is closed we just assume that it probably came from an on-chain claim.
channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
- downstream_closed, downstream_node_id, downstream_funding);
+ downstream_closed, true, downstream_node_id, downstream_funding);
}
//TODO: Broadcast channel update for closed channels, but only after we've made a