Block the mon update removing a preimage until upstream mon writes
authorMatt Corallo <git@bluematt.me>
Thu, 7 Sep 2023 02:22:52 +0000 (02:22 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 12 Sep 2023 19:03:17 +0000 (19:03 +0000)
When we forward a payment and receive an `update_fulfill_htlc`
message from the downstream channel, we immediately claim the HTLC
on the upstream channel, before even doing a `commitment_signed`
dance on the downstream channel. This implies that our
`ChannelMonitorUpdate`s "go out" in the right order - first we
ensure we'll get our money by writing the preimage down, then we
write the update that resolves giving money on the downstream node.

This is safe as long as `ChannelMonitorUpdate`s complete in the
order in which they are generated, but of course looking forward we
want to support asynchronous updates, which may complete in any
order.

Thus, here, we enforce the correct ordering by blocking the
downstream `ChannelMonitorUpdate` until the upstream one completes.
Like the `PaymentSent` event handling we do so only for the
`revoke_and_ack` `ChannelMonitorUpdate`, ensuring the
preimage-containing upstream update has a full RTT to complete
before we actually manage to slow anything down.

lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs

index cd125c3fd26f5834d96e4806dc50f22f2ef6156f..3001485b09739f95b2dad0539b29d54cdfffecd0 100644 (file)
@@ -3038,8 +3038,8 @@ fn test_blocked_chan_preimage_release() {
        let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
        let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
 
-       create_announced_chan_between_nodes(&nodes, 0, 1).2;
-       create_announced_chan_between_nodes(&nodes, 1, 2).2;
+       create_announced_chan_between_nodes(&nodes, 0, 1);
+       let chan_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2).2;
 
        send_payment(&nodes[0], &[&nodes[1], &nodes[2]], 5_000_000);
 
@@ -3068,11 +3068,20 @@ fn test_blocked_chan_preimage_release() {
        let as_htlc_fulfill_updates = get_htlc_update_msgs!(nodes[0], nodes[1].node.get_our_node_id());
        nodes[1].node.handle_update_fulfill_htlc(&nodes[0].node.get_our_node_id(), &as_htlc_fulfill_updates.update_fulfill_htlcs[0]);
        check_added_monitors(&nodes[1], 1); // We generate only a preimage monitor update
+       assert!(get_monitor!(nodes[1], chan_id_2).get_stored_preimages().contains_key(&payment_hash_2));
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
 
-       // Finish the CS dance between nodes[0] and nodes[1].
-       do_commitment_signed_dance(&nodes[1], &nodes[0], &as_htlc_fulfill_updates.commitment_signed, false, false);
+       // Finish the CS dance between nodes[0] and nodes[1]. Note that until the event handling, the
+       // update_fulfill_htlc + CS is held, even though the preimage is already on disk for the
+       // channel.
+       nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &as_htlc_fulfill_updates.commitment_signed);
+       check_added_monitors(&nodes[1], 1);
+       let (a, raa) = do_main_commitment_signed_dance(&nodes[1], &nodes[0], false);
+       assert!(a.is_none());
+
+       nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &raa);
        check_added_monitors(&nodes[1], 0);
+       assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
 
        let events = nodes[1].node.get_and_clear_pending_events();
        assert_eq!(events.len(), 3);
@@ -3080,8 +3089,8 @@ fn test_blocked_chan_preimage_release() {
        if let Event::PaymentPathSuccessful { .. } = events[2] {} else { panic!(); }
        if let Event::PaymentForwarded { .. } = events[1] {} else { panic!(); }
 
-       // The event processing should release the last RAA update.
-       check_added_monitors(&nodes[1], 1);
+       // The event processing should release the last RAA updates on both channels.
+       check_added_monitors(&nodes[1], 2);
 
        // When we fetch the next update the message getter will generate the next update for nodes[2],
        // generating a further monitor update.
@@ -3092,3 +3101,128 @@ fn test_blocked_chan_preimage_release() {
        do_commitment_signed_dance(&nodes[2], &nodes[1], &bs_htlc_fulfill_updates.commitment_signed, false, false);
        expect_payment_sent(&nodes[2], payment_preimage_2, None, true, true);
 }
+
+fn do_test_inverted_mon_completion_order(complete_bc_commitment_dance: bool) {
+       // When we forward a payment and receive an `update_fulfill_htlc` message from the downstream
+       // channel, we immediately claim the HTLC on the upstream channel, before even doing a
+       // `commitment_signed` dance on the downstream channel. This implies that our
+       // `ChannelMonitorUpdate`s are generated in the right order - first we ensure we'll get our
+       // money, then we write the update that resolves the downstream node claiming their money. This
+       // is safe as long as `ChannelMonitorUpdate`s complete in the order in which they are
+       // generated, but of course this may not be the case. For asynchronous update writes, we have
+       // to ensure monitor updates can block each other, preventing the inversion all together.
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+
+       let persister;
+       let new_chain_monitor;
+       let nodes_1_deserialized;
+
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       let chan_id_ab = create_announced_chan_between_nodes(&nodes, 0, 1).2;
+       let chan_id_bc = create_announced_chan_between_nodes(&nodes, 1, 2).2;
+
+       // Route a payment from A, through B, to C, then claim it on C. Once we pass B the
+       // `update_fulfill_htlc` we have a monitor update for both of B's channels. We complete the one
+       // on the B<->C channel but leave the A<->B monitor update pending, then reload B.
+       let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 100_000);
+
+       let mon_ab = get_monitor!(nodes[1], chan_id_ab).encode();
+
+       nodes[2].node.claim_funds(payment_preimage);
+       check_added_monitors(&nodes[2], 1);
+       expect_payment_claimed!(nodes[2], payment_hash, 100_000);
+
+       chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
+       let cs_updates = get_htlc_update_msgs(&nodes[2], &nodes[1].node.get_our_node_id());
+       nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]);
+
+       // B generates a new monitor update for the A <-> B channel, but doesn't send the new messages
+       // for it since the monitor update is marked in-progress.
+       check_added_monitors(&nodes[1], 1);
+       assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+
+       // Now step the Commitment Signed Dance between B and C forward a bit (or fully), ensuring we
+       // won't get the preimage when the nodes reconnect and we have to get it from the
+       // ChannelMonitor.
+       nodes[1].node.handle_commitment_signed(&nodes[2].node.get_our_node_id(), &cs_updates.commitment_signed);
+       check_added_monitors(&nodes[1], 1);
+       if complete_bc_commitment_dance {
+               let (bs_revoke_and_ack, bs_commitment_signed) = get_revoke_commit_msgs!(nodes[1], nodes[2].node.get_our_node_id());
+               nodes[2].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_revoke_and_ack);
+               check_added_monitors(&nodes[2], 1);
+               nodes[2].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_commitment_signed);
+               check_added_monitors(&nodes[2], 1);
+               let cs_raa = get_event_msg!(nodes[2], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id());
+
+               // At this point node B still hasn't persisted the `ChannelMonitorUpdate` with the
+               // preimage in the A <-> B channel, which will prevent it from persisting the
+               // `ChannelMonitorUpdate` for the B<->C channel here to avoid "losing" the preimage.
+               nodes[1].node.handle_revoke_and_ack(&nodes[2].node.get_our_node_id(), &cs_raa);
+               check_added_monitors(&nodes[1], 0);
+               assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+       }
+
+       // Now reload node B
+       let manager_b = nodes[1].node.encode();
+
+       let mon_bc = get_monitor!(nodes[1], chan_id_bc).encode();
+       reload_node!(nodes[1], &manager_b, &[&mon_ab, &mon_bc], persister, new_chain_monitor, nodes_1_deserialized);
+
+       nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+       nodes[2].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+
+       // If we used the latest ChannelManager to reload from, we should have both channels still
+       // live. The B <-> C channel's final RAA ChannelMonitorUpdate must still be blocked as
+       // before - the ChannelMonitorUpdate for the A <-> B channel hasn't completed.
+       // When we call `timer_tick_occurred` we will get that monitor update back, which we'll
+       // complete after reconnecting to our peers.
+       persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
+       nodes[1].node.timer_tick_occurred();
+       check_added_monitors(&nodes[1], 1);
+       assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+
+       // Now reconnect B to both A and C. If the B <-> C commitment signed dance wasn't run to
+       // the end go ahead and do that, though the
+       // `pending_responding_commitment_signed_dup_monitor` in `reconnect_args` indicates that we
+       // expect to *not* receive the final RAA ChannelMonitorUpdate.
+       if complete_bc_commitment_dance {
+               reconnect_nodes(ReconnectArgs::new(&nodes[1], &nodes[2]));
+       } else {
+               let mut reconnect_args = ReconnectArgs::new(&nodes[1], &nodes[2]);
+               reconnect_args.pending_responding_commitment_signed.1 = true;
+               reconnect_args.pending_responding_commitment_signed_dup_monitor.1 = true;
+               reconnect_args.pending_raa = (false, true);
+               reconnect_nodes(reconnect_args);
+       }
+
+       reconnect_nodes(ReconnectArgs::new(&nodes[0], &nodes[1]));
+
+       // (Finally) complete the A <-> B ChannelMonitorUpdate, ensuring the preimage is durably on
+       // disk in the proper ChannelMonitor, unblocking the B <-> C ChannelMonitor updating
+       // process.
+       let (outpoint, _, ab_update_id) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id_ab).unwrap().clone();
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, ab_update_id).unwrap();
+
+       // When we fetch B's HTLC update messages here (now that the ChannelMonitorUpdate has
+       // completed), it will also release the final RAA ChannelMonitorUpdate on the B <-> C
+       // channel.
+       let bs_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id());
+       check_added_monitors(&nodes[1], 1);
+
+       nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_updates.update_fulfill_htlcs[0]);
+       do_commitment_signed_dance(&nodes[0], &nodes[1], &bs_updates.commitment_signed, false, false);
+
+       expect_payment_forwarded!(nodes[1], &nodes[0], &nodes[2], Some(1_000), false, false);
+
+       // Finally, check that the payment was, ultimately, seen as sent by node A.
+       expect_payment_sent(&nodes[0], payment_preimage, None, true, true);
+}
+
+#[test]
+fn test_inverted_mon_completion_order() {
+       do_test_inverted_mon_completion_order(true);
+       do_test_inverted_mon_completion_order(false);
+}
index ed00282fb718d526aebb5ea3d41b4614732d4b67..d68de5de39fb409585bd0cbe7bd4fb9cef6d170f 100644 (file)
@@ -656,7 +656,6 @@ pub(crate) enum RAAMonitorUpdateBlockingAction {
 }
 
 impl RAAMonitorUpdateBlockingAction {
-       #[allow(unused)]
        fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
                Self::ForwardedPaymentInboundClaim {
                        channel_id: prev_hop.outpoint.to_channel_id(),
@@ -5175,11 +5174,17 @@ where
                self.pending_outbound_payments.finalize_claims(sources, &self.pending_events);
        }
 
-       fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_outpoint: OutPoint) {
+       fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage,
+               forwarded_htlc_value_msat: Option<u64>, from_onchain: bool,
+               next_channel_counterparty_node_id: Option<PublicKey>, next_channel_outpoint: OutPoint
+       ) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
                                debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
                                        "We don't support claim_htlc claims during startup - monitors may not be available yet");
+                               if let Some(pubkey) = next_channel_counterparty_node_id {
+                                       debug_assert_eq!(pubkey, path.hops[0].pubkey);
+                               }
                                let ev_completion_action = EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
                                        channel_funding_outpoint: next_channel_outpoint,
                                        counterparty_node_id: path.hops[0].pubkey,
@@ -5190,6 +5195,7 @@ where
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
                                let prev_outpoint = hop_data.outpoint;
+                               let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
                                let res = self.claim_funds_from_hop(hop_data, payment_preimage,
                                        |htlc_claim_value_msat| {
                                                if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
@@ -5205,7 +5211,17 @@ where
                                                                        next_channel_id: Some(next_channel_outpoint.to_channel_id()),
                                                                        outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
                                                                },
-                                                               downstream_counterparty_and_funding_outpoint: None,
+                                                               downstream_counterparty_and_funding_outpoint:
+                                                                       if let Some(node_id) = next_channel_counterparty_node_id {
+                                                                               Some((node_id, next_channel_outpoint, completed_blocker))
+                                                                       } else {
+                                                                               // We can only get `None` here if we are processing a
+                                                                               // `ChannelMonitor`-originated event, in which case we
+                                                                               // don't care about ensuring we wake the downstream
+                                                                               // channel's monitor updating - the channel is already
+                                                                               // closed.
+                                                                               None
+                                                                       },
                                                        })
                                                } else { None }
                                        });
@@ -6044,6 +6060,17 @@ where
                                hash_map::Entry::Occupied(mut chan_phase_entry) => {
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
                                                let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry);
+                                               if let HTLCSource::PreviousHopData(prev_hop) = &res.0 {
+                                                       peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id)
+                                                               .or_insert_with(Vec::new)
+                                                               .push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop));
+                                               }
+                                               // Note that we do not need to push an `actions_blocking_raa_monitor_updates`
+                                               // entry here, even though we *do* need to block the next RAA monitor update.
+                                               // We do this instead in the `claim_funds_internal` by attaching a
+                                               // `ReleaseRAAChannelMonitorUpdate` action to the event generated when the
+                                               // outbound HTLC is claimed. This is guaranteed to all complete before we
+                                               // process the RAA as messages are processed from single peers serially.
                                                funding_txo = chan.context.get_funding_txo().expect("We won't accept a fulfill until funded");
                                                res
                                        } else {
@@ -6054,7 +6081,7 @@ where
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
                };
-               self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, funding_txo);
+               self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo);
                Ok(())
        }
 
@@ -6256,6 +6283,23 @@ where
                })
        }
 
+       #[cfg(any(test, feature = "_test_utils"))]
+       pub(crate) fn test_raa_monitor_updates_held(&self,
+               counterparty_node_id: PublicKey, channel_id: ChannelId
+       ) -> bool {
+               let per_peer_state = self.per_peer_state.read().unwrap();
+               if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
+                       let mut peer_state_lck = peer_state_mtx.lock().unwrap();
+                       let peer_state = &mut *peer_state_lck;
+
+                       if let Some(chan) = peer_state.channel_by_id.get(&channel_id) {
+                               return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
+                                       chan.context().get_funding_txo().unwrap(), counterparty_node_id);
+                       }
+               }
+               false
+       }
+
        fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
                let (htlcs_to_fail, res) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
@@ -6477,8 +6521,8 @@ where
                                match monitor_event {
                                        MonitorEvent::HTLCEvent(htlc_update) => {
                                                if let Some(preimage) = htlc_update.payment_preimage {
-                                                       log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", &preimage);
-                                                       self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint);
+                                                       log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage);
+                                                       self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint);
                                                } else {
                                                        log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash);
                                                        let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() };
@@ -9298,6 +9342,7 @@ where
                                                                        // downstream chan is closed (because we don't have a
                                                                        // channel_id -> peer map entry).
                                                                        counterparty_opt.is_none(),
+                                                                       counterparty_opt.cloned().or(monitor.get_counterparty_node_id()),
                                                                        monitor.get_funding_txo().0))
                                                        } else { None }
                                                } else {
@@ -9576,12 +9621,12 @@ where
                        channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                }
 
-               for (source, preimage, downstream_value, downstream_closed, downstream_funding) in pending_claims_to_replay {
+               for (source, preimage, downstream_value, downstream_closed, downstream_node_id, downstream_funding) in pending_claims_to_replay {
                        // We use `downstream_closed` in place of `from_onchain` here just as a guess - we
                        // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
                        // channel is closed we just assume that it probably came from an on-chain claim.
                        channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
-                               downstream_closed, downstream_funding);
+                               downstream_closed, downstream_node_id, downstream_funding);
                }
 
                //TODO: Broadcast channel update for closed channels, but only after we've made a
index ebb2bb2e06b08f4be8ba4642d55786422da4f5de..cd03ae0978e7a5782beb2a64164935c7d1c70f6f 100644 (file)
@@ -1772,20 +1772,7 @@ pub fn do_commitment_signed_dance(node_a: &Node<'_, '_, '_>, node_b: &Node<'_, '
        check_added_monitors!(node_a, 1);
 
        // If this commitment signed dance was due to a claim, don't check for an RAA monitor update.
-       let got_claim = node_a.node.pending_events.lock().unwrap().iter().any(|(ev, action)| {
-               let matching_action = if let Some(channelmanager::EventCompletionAction::ReleaseRAAChannelMonitorUpdate
-                       { channel_funding_outpoint, counterparty_node_id }) = action
-               {
-                       if channel_funding_outpoint.to_channel_id() == commitment_signed.channel_id {
-                               assert_eq!(*counterparty_node_id, node_b.node.get_our_node_id());
-                               true
-                       } else { false }
-               } else { false };
-               if matching_action {
-                       if let Event::PaymentSent { .. } = ev {} else { panic!(); }
-               }
-               matching_action
-       });
+       let got_claim = node_a.node.test_raa_monitor_updates_held(node_b.node.get_our_node_id(), commitment_signed.channel_id);
        if fail_backwards { assert!(!got_claim); }
        commitment_signed_dance!(node_a, node_b, (), fail_backwards, true, false, got_claim);