/// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
/// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
/// event can be generated.
- PaymentClaimed { payment_hash: PaymentHash },
+ PaymentClaimed {
+ payment_hash: PaymentHash,
+ /// A pending MPP claim which hasn't yet completed.
+ ///
+ /// Not written to disk.
+ pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
+ },
/// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
/// operation of another channel.
///
}
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
- (0, PaymentClaimed) => { (0, payment_hash, required) },
+ (0, PaymentClaimed) => {
+ (0, payment_hash, required),
+ (9999999999, pending_mpp_claim, (static_value, None)),
+ },
// Note that FreeOtherChannelImmediately should never be written - we were supposed to free
// *immediately*. However, for simplicity we implement read/write here.
(1, FreeOtherChannelImmediately) => {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
- let mut sources = {
+ let sources = {
let mut claimable_payments = self.claimable_payments.lock().unwrap();
if let Some(payment) = claimable_payments.claimable_payments.remove(&payment_hash) {
let mut receiver_node_id = self.our_network_pubkey;
return;
}
if valid_mpp {
- for htlc in sources.drain(..) {
+ let pending_mpp_claim_ptr_opt = if sources.len() > 1 {
+ let channels_without_preimage = sources.iter().filter_map(|htlc| {
+ if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+ let prev_hop = &htlc.prev_hop;
+ Some((cp_id, prev_hop.outpoint, prev_hop.channel_id, prev_hop.htlc_id))
+ } else {
+ None
+ }
+ }).collect();
+ Some(Arc::new(Mutex::new(PendingMPPClaim {
+ channels_without_preimage,
+ channels_with_preimage: Vec::new(),
+ })))
+ } else {
+ None
+ };
+ for htlc in sources {
+ let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().and_then(|pending_mpp_claim|
+ if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+ let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim));
+ Some((cp_id, htlc.prev_hop.channel_id, htlc.prev_hop.htlc_id, claim_ptr))
+ } else {
+ None
+ }
+ );
+ let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| {
+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
+ pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
+ }
+ });
self.claim_funds_from_hop(
htlc.prev_hop, payment_preimage,
|_, definitely_duplicate| {
debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
- Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
+ (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim: this_mpp_claim }), raa_blocker)
}
);
}
- }
- if !valid_mpp {
- for htlc in sources.drain(..) {
+ } else {
+ for htlc in sources {
let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes());
let source = HTLCSource::PreviousHopData(htlc.prev_hop);
}
}
- fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(
+ fn claim_funds_from_hop<
+ ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
+ >(
&self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
completion_action: ComplFunc,
) {
match fulfill_res {
UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
- if let Some(action) = completion_action(Some(htlc_value_msat), false) {
+ let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
+ if let Some(action) = action_opt {
log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
chan_id, action);
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
}
+ if let Some(raa_blocker) = raa_blocker_opt {
+ peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
+ }
if !during_init {
handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
peer_state, per_peer_state, chan);
}
}
UpdateFulfillCommitFetch::DuplicateClaim {} => {
- let action = if let Some(action) = completion_action(None, true) {
+ let (action_opt, raa_blocker_opt) = completion_action(None, true);
+ if let Some(raa_blocker) = raa_blocker_opt {
+ debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
+ }
+ let action = if let Some(action) = action_opt {
action
} else {
return;
};
+
mem::drop(peer_state_lock);
log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
// generally always allowed to be duplicative (and it's specifically noted in
// `PaymentForwarded`).
- self.handle_monitor_update_completion_actions(completion_action(None, false));
+ let (action_opt, raa_blocker_opt) = completion_action(None, false);
+
+ if let Some(raa_blocker) = raa_blocker_opt {
+ let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
+ // prev_hop.counterparty_node_id is always available for payments received after
+ // LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
+ // look up the counterparty in the `action_opt`, if possible.
+ action_opt.as_ref().and_then(|action|
+ if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
+ pending_mpp_claim.as_ref().map(|(node_id, _, _, _)| *node_id)
+ } else { None }
+ )
+ );
+ if let Some(counterparty_node_id) = counterparty_node_id {
+ // TODO: Avoid always blocking the world for the write lock here.
+ let mut per_peer_state = self.per_peer_state.write().unwrap();
+ let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
+ Mutex::new(PeerState {
+ channel_by_id: new_hash_map(),
+ inbound_channel_request_by_id: new_hash_map(),
+ latest_features: InitFeatures::empty(),
+ pending_msg_events: Vec::new(),
+ in_flight_monitor_updates: BTreeMap::new(),
+ monitor_update_blocked_actions: BTreeMap::new(),
+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
+ is_connected: false,
+ }));
+ let mut peer_state = peer_state_mutex.lock().unwrap();
+
+ peer_state.actions_blocking_raa_monitor_updates
+ .entry(prev_hop.channel_id)
+ .or_insert_with(Vec::new)
+ .push(raa_blocker);
+ } else {
+ debug_assert!(false,
+ "RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
+ }
+ }
+
+ self.handle_monitor_update_completion_actions(action_opt);
}
fn finalize_claims(&self, sources: Vec<HTLCSource>) {
}
}), "{:?}", *background_events);
}
- None
+ (None, None)
} else if definitely_duplicate {
if let Some(other_chan) = chan_to_release {
- Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+ (Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
downstream_counterparty_node_id: other_chan.counterparty_node_id,
downstream_funding_outpoint: other_chan.funding_txo,
downstream_channel_id: other_chan.channel_id,
blocking_action: other_chan.blocking_action,
- })
- } else { None }
+ }), None)
+ } else { (None, None) }
} else {
let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
if let Some(claimed_htlc_value) = htlc_claim_value_msat {
} else { None };
debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
"skimmed_fee_msat must always be included in total_fee_earned_msat");
- Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+ (Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
event: events::Event::PaymentForwarded {
prev_channel_id: Some(prev_channel_id),
next_channel_id: Some(next_channel_id),
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
},
downstream_counterparty_and_funding_outpoint: chan_to_release,
- })
+ }), None)
}
});
},
debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
+ let mut freed_channels = Vec::new();
+
for action in actions.into_iter() {
match action {
- MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
+ MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
+ if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
+ let mut peer_state = peer_state_mutex.lock().unwrap();
+ let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
+ if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
+ blockers.get_mut().retain(|blocker|
+ if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
+ if *pending_claim == claim_ptr {
+ let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
+ let pending_claim_state = &mut *pending_claim_state_lock;
+ pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
+ if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
+ pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
+ false
+ } else { true }
+ });
+ if pending_claim_state.channels_without_preimage.is_empty() {
+ for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
+ freed_channels.push((*cp, *outp, *cid, blocker.clone()));
+ }
+ }
+ !pending_claim_state.channels_without_preimage.is_empty()
+ } else { true }
+ } else { true }
+ );
+ if blockers.get().is_empty() {
+ blockers.remove();
+ }
+ }
+ });
+ }
+
let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
if let Some(ClaimingPayment {
amount_msat,
},
}
}
+
+ for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
+ self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
+ }
}
/// Handles a channel reentering a functional state, either due to reconnect or a monitor