}
);
+/// The source argument which is passed to [`ChannelManager::claim_mpp_part`].
+///
+/// This is identical to [`MPPClaimHTLCSource`] except that [`Self::counterparty_node_id`] is an
+/// `Option`, whereas it is required in [`MPPClaimHTLCSource`]. In the future, we should ideally
+/// drop this and merge the two, however doing so may break upgrades for nodes which have pending
+/// forwarded payments.
+struct HTLCClaimSource {
+ counterparty_node_id: Option<PublicKey>,
+ funding_txo: OutPoint,
+ channel_id: ChannelId,
+ htlc_id: u64,
+}
+
+impl From<&MPPClaimHTLCSource> for HTLCClaimSource {
+ fn from(o: &MPPClaimHTLCSource) -> HTLCClaimSource {
+ HTLCClaimSource {
+ counterparty_node_id: Some(o.counterparty_node_id),
+ funding_txo: o.funding_txo,
+ channel_id: o.channel_id,
+ htlc_id: o.htlc_id,
+ }
+ }
+}
+
#[derive(Clone, Debug, PartialEq, Eq)]
/// The source of an HTLC which is being claimed as a part of an incoming payment. Each part is
-/// tracked in [`PendingMPPClaim`].
+/// tracked in [`PendingMPPClaim`] as well as in [`ChannelMonitor`]s, so that it can be converted
+/// to an [`HTLCClaimSource`] for claim replays on startup.
struct MPPClaimHTLCSource {
counterparty_node_id: PublicKey,
funding_txo: OutPoint,
>(
&self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
+ ) {
+ let counterparty_node_id =
+ match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
+ Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
+ None => None
+ };
+
+ let htlc_source = HTLCClaimSource {
+ counterparty_node_id,
+ funding_txo: prev_hop.outpoint,
+ channel_id: prev_hop.channel_id,
+ htlc_id: prev_hop.htlc_id,
+ };
+ self.claim_mpp_part(htlc_source, payment_preimage, payment_info, completion_action)
+ }
+
+ fn claim_mpp_part<
+ ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
+ >(
+ &self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage,
+ payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
) {
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
{
let per_peer_state = self.per_peer_state.read().unwrap();
let chan_id = prev_hop.channel_id;
- let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
- Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
- None => None
- };
- let peer_state_opt = counterparty_node_id_opt.as_ref().map(
+ let peer_state_opt = prev_hop.counterparty_node_id.as_ref().map(
|counterparty_node_id| per_peer_state.get(counterparty_node_id)
.map(|peer_mutex| peer_mutex.lock().unwrap())
).unwrap_or(None);
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
}
if !during_init {
- handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+ handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_lock,
peer_state, per_peer_state, chan);
} else {
// If we're running during init we cannot update a monitor directly -
self.pending_background_events.lock().unwrap().push(
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
- funding_txo: prev_hop.outpoint,
+ funding_txo: prev_hop.funding_txo,
channel_id: prev_hop.channel_id,
update: monitor_update.clone(),
});
}
let preimage_update = ChannelMonitorUpdate {
update_id: CLOSED_CHANNEL_UPDATE_ID,
- counterparty_node_id: None,
+ counterparty_node_id: prev_hop.counterparty_node_id,
updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
payment_preimage,
payment_info,
if !during_init {
// We update the ChannelMonitor on the backward link, after
// receiving an `update_fulfill_htlc` from the forward link.
- let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
+ let update_res = self.chain_monitor.update_channel(prev_hop.funding_txo, &preimage_update);
if update_res != ChannelMonitorUpdateStatus::Completed {
// TODO: This needs to be handled somehow - if we receive a monitor update
// with a preimage we *must* somehow manage to propagate it to the upstream
// complete the monitor update completion action from `completion_action`.
self.pending_background_events.lock().unwrap().push(
BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
- prev_hop.outpoint, prev_hop.channel_id, preimage_update,
+ prev_hop.funding_txo, prev_hop.channel_id, preimage_update,
)));
}
// Note that we do process the completion action here. This totally could be a
onion_fields,
payment_id,
}) = payment {
- self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed {
+ let event = events::Event::PaymentClaimed {
payment_hash,
purpose,
amount_msat,
sender_intended_total_msat,
onion_fields,
payment_id,
- }, None));
+ };
+ let event_action = (event, None);
+ let mut pending_events = self.pending_events.lock().unwrap();
+ // If we're replaying a claim on startup we may end up duplicating an event
+ // that's already in our queue, so check before we push another one. The
+ // `payment_id` should suffice to ensure we never spuriously drop a second
+ // event for a duplicate payment.
+ if !pending_events.contains(&event_action) {
+ pending_events.push_back(event_action);
+ }
}
},
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
};
for (_, monitor) in args.channel_monitors.iter() {
- for (payment_hash, (payment_preimage, _)) in monitor.get_stored_preimages() {
- let per_peer_state = channel_manager.per_peer_state.read().unwrap();
- let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
- let payment = claimable_payments.claimable_payments.remove(&payment_hash);
- mem::drop(claimable_payments);
- if let Some(payment) = payment {
- log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
- let mut claimable_amt_msat = 0;
- let mut receiver_node_id = Some(our_network_pubkey);
- let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
- if phantom_shared_secret.is_some() {
- let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
- .expect("Failed to get node_id for phantom node recipient");
- receiver_node_id = Some(phantom_pubkey)
- }
- for claimable_htlc in &payment.htlcs {
- claimable_amt_msat += claimable_htlc.value;
-
- // Add a holding-cell claim of the payment to the Channel, which should be
- // applied ~immediately on peer reconnection. Because it won't generate a
- // new commitment transaction we can just provide the payment preimage to
- // the corresponding ChannelMonitor and nothing else.
- //
- // We do so directly instead of via the normal ChannelMonitor update
- // procedure as the ChainMonitor hasn't yet been initialized, implying
- // we're not allowed to call it directly yet. Further, we do the update
- // without incrementing the ChannelMonitor update ID as there isn't any
- // reason to.
- // If we were to generate a new ChannelMonitor update ID here and then
- // crash before the user finishes block connect we'd end up force-closing
- // this channel as well. On the flip side, there's no harm in restarting
- // without the new monitor persisted - we'll end up right back here on
- // restart.
- let previous_channel_id = claimable_htlc.prev_hop.channel_id;
- let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
- .get(&claimable_htlc.prev_hop.outpoint).cloned();
- if let Some(peer_node_id) = peer_node_id_opt {
- let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
- if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
- let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
- channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
+ for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() {
+ if !payment_claims.is_empty() {
+ for payment_claim in payment_claims {
+ if payment_claim.mpp_parts.is_empty() {
+ return Err(DecodeError::InvalidValue);
+ }
+ let pending_claims = PendingMPPClaim {
+ channels_without_preimage: payment_claim.mpp_parts.clone(),
+ channels_with_preimage: Vec::new(),
+ };
+ let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
+
+ // While it may be duplicative to generate a PaymentClaimed here, trying to
+ // figure out if the user definitely saw it before shutdown would require some
+ // nontrivial logic and may break as we move away from regularly persisting
+ // ChannelManager. Instead, we rely on the users' event handler being
+ // idempotent and just blindly generate one no matter what, letting the
+ // preimages eventually timing out from ChannelMonitors to prevent us from
+ // doing so forever.
+
+ let claim_found =
+ channel_manager.claimable_payments.lock().unwrap().begin_claiming_payment(
+ payment_hash, &channel_manager.node_signer, &channel_manager.logger,
+ &channel_manager.inbound_payment_id_secret, true,
+ );
+ if claim_found.is_err() {
+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
+ match claimable_payments.pending_claiming_payments.entry(payment_hash) {
+ hash_map::Entry::Occupied(_) => {
+ debug_assert!(false, "Entry was added in begin_claiming_payment");
+ return Err(DecodeError::InvalidValue);
+ },
+ hash_map::Entry::Vacant(entry) => {
+ entry.insert(payment_claim.claiming_payment);
+ },
}
}
- if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
- previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
+
+ for part in payment_claim.mpp_parts.iter() {
+ let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| (
+ part.counterparty_node_id, part.channel_id, part.htlc_id,
+ PendingMPPClaimPointer(Arc::clone(&ptr))
+ ));
+ let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr|
+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
+ pending_claim: PendingMPPClaimPointer(Arc::clone(&ptr)),
+ }
+ );
+ // Note that we don't need to pass the `payment_info` here - its
+ // already (clearly) durably on disk in the `ChannelMonitor` so there's
+ // no need to worry about getting it into others.
+ channel_manager.claim_mpp_part(
+ part.into(), payment_preimage, None,
+ |_, _|
+ (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr)
+ );
}
}
- let mut pending_events = channel_manager.pending_events.lock().unwrap();
- let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
- pending_events.push_back((events::Event::PaymentClaimed {
- receiver_node_id,
- payment_hash,
- purpose: payment.purpose,
- amount_msat: claimable_amt_msat,
- htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
- sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
- onion_fields: payment.onion_fields,
- payment_id: Some(payment_id),
- }, None));
+ } else {
+ let per_peer_state = channel_manager.per_peer_state.read().unwrap();
+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
+ let payment = claimable_payments.claimable_payments.remove(&payment_hash);
+ mem::drop(claimable_payments);
+ if let Some(payment) = payment {
+ log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
+ let mut claimable_amt_msat = 0;
+ let mut receiver_node_id = Some(our_network_pubkey);
+ let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
+ if phantom_shared_secret.is_some() {
+ let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
+ .expect("Failed to get node_id for phantom node recipient");
+ receiver_node_id = Some(phantom_pubkey)
+ }
+ for claimable_htlc in &payment.htlcs {
+ claimable_amt_msat += claimable_htlc.value;
+
+ // Add a holding-cell claim of the payment to the Channel, which should be
+ // applied ~immediately on peer reconnection. Because it won't generate a
+ // new commitment transaction we can just provide the payment preimage to
+ // the corresponding ChannelMonitor and nothing else.
+ //
+ // We do so directly instead of via the normal ChannelMonitor update
+ // procedure as the ChainMonitor hasn't yet been initialized, implying
+ // we're not allowed to call it directly yet. Further, we do the update
+ // without incrementing the ChannelMonitor update ID as there isn't any
+ // reason to.
+ // If we were to generate a new ChannelMonitor update ID here and then
+ // crash before the user finishes block connect we'd end up force-closing
+ // this channel as well. On the flip side, there's no harm in restarting
+ // without the new monitor persisted - we'll end up right back here on
+ // restart.
+ let previous_channel_id = claimable_htlc.prev_hop.channel_id;
+ let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
+ .get(&claimable_htlc.prev_hop.outpoint).cloned();
+ if let Some(peer_node_id) = peer_node_id_opt {
+ let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+ let peer_state = &mut *peer_state_lock;
+ if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
+ let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
+ channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
+ }
+ }
+ if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
+ previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
+ }
+ }
+ let mut pending_events = channel_manager.pending_events.lock().unwrap();
+ let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
+ pending_events.push_back((events::Event::PaymentClaimed {
+ receiver_node_id,
+ payment_hash,
+ purpose: payment.purpose,
+ amount_msat: claimable_amt_msat,
+ htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
+ sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
+ onion_fields: payment.onion_fields,
+ payment_id: Some(payment_id),
+ }, None));
+ }
}
}
}
// Now restart nodes[3].
reload_node!(nodes[3], original_manager, &[&updated_monitor.0, &original_monitor.0], persister, new_chain_monitor, nodes_3_deserialized);
- // On startup the preimage should have been copied into the non-persisted monitor:
+ // Until the startup background events are processed (in `get_and_clear_pending_events`,
+ // below), the preimage is not copied to the non-persisted monitor...
assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash));
- assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
+ assert_eq!(
+ get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash),
+ persist_both_monitors,
+ );
nodes[1].node.peer_disconnected(nodes[3].node.get_our_node_id());
nodes[2].node.peer_disconnected(nodes[3].node.get_our_node_id());
// During deserialization, we should have closed one channel and broadcast its latest
// commitment transaction. We should also still have the original PaymentClaimable event we
- // never finished processing.
+ // never finished processing as well as a PaymentClaimed event regenerated when we replayed the
+ // preimage onto the non-persisted monitor.
let events = nodes[3].node.get_and_clear_pending_events();
assert_eq!(events.len(), if persist_both_monitors { 4 } else { 3 });
if let Event::PaymentClaimable { amount_msat: 15_000_000, .. } = events[0] { } else { panic!(); }
if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[1] { } else { panic!(); }
if persist_both_monitors {
if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[2] { } else { panic!(); }
- check_added_monitors(&nodes[3], 2);
+ if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[3] { } else { panic!(); }
+ check_added_monitors(&nodes[3], 6);
} else {
- check_added_monitors(&nodes[3], 1);
+ if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[2] { } else { panic!(); }
+ check_added_monitors(&nodes[3], 3);
}
+ // Now that we've processed background events, the preimage should have been copied into the
+ // non-persisted monitor:
+ assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash));
+ assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
+
// On restart, we should also get a duplicate PaymentClaimed event as we persisted the
// ChannelManager prior to handling the original one.
if let Event::PaymentClaimed { payment_hash: our_payment_hash, amount_msat: 15_000_000, .. } =
nodes[0].node.handle_update_fulfill_htlc(nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]);
commitment_signed_dance!(nodes[0], nodes[2], cs_updates.commitment_signed, false, true);
expect_payment_sent!(nodes[0], payment_preimage);
+
+ // Ensure that the remaining channel is fully operation and not blocked (and that after a
+ // cycle of commitment updates the payment preimage is ultimately pruned).
+ send_payment(&nodes[0], &[&nodes[2], &nodes[3]], 100_000);
+ assert!(!get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
}
}