Fail holding-cell AddHTLCs on Channel deser to match disconnection 2020-11-holding-cell-add-panic
authorMatt Corallo <git@bluematt.me>
Thu, 19 Nov 2020 00:28:09 +0000 (19:28 -0500)
committerMatt Corallo <git@bluematt.me>
Thu, 19 Nov 2020 01:23:47 +0000 (20:23 -0500)
As Channel::write says in the comment at the top: "we write out as
if remove_uncommitted_htlcs_and_mark_paused had just been called",
except that we previously deliberately included holding-cell
AddHTLC events in the serialization. On the flip side, in
remove_uncommitted_htlcs_and_mark_paused, we removed pending
AddHTLC events under the assumption that, if we can't forward
something ASAP, its better to fail it back to the origin than to
sit on it for a while.

Given there's likely to be just as large a time-lag between
ser/deserialization as between when a peer dis/reconnects, there
isn't much of a reason for this difference. Worse, we debug_assert
that there are no pending AddHTLC holding cell events when doing a
reconnect, so any tests or fuzzers which deserialized a
ChannelManager with AddHTLC events would panic.

We resolve this by adding logic to fail any holding-cell AddHTLC
events upon deserialization, in part because trying to forward it
before we're sure we have an up-to-date chain is somewhat risky -
the sender may have already gone to chain while our upstream has
not.

lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs

index 689c3496de16ebfd83fdc782d7ddec931d36b0f9..945a5a5961f99bcbea05528ea488cfec31ada4f4 100644 (file)
@@ -18,15 +18,16 @@ use bitcoin::network::constants::Network;
 use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr};
 use chain::transaction::OutPoint;
 use chain::Watch;
-use ln::channelmanager::{RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
+use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
 use ln::features::InitFeatures;
 use ln::msgs;
 use ln::msgs::{ChannelMessageHandler, ErrorAction, RoutingMessageHandler};
 use routing::router::get_route;
+use util::config::UserConfig;
 use util::enforcing_trait_impls::EnforcingChannelKeys;
 use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
 use util::errors::APIError;
-use util::ser::Readable;
+use util::ser::{Readable, ReadableArgs, Writeable};
 
 use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::hashes::Hash;
@@ -35,6 +36,8 @@ use ln::functional_test_utils::*;
 
 use util::test_utils;
 
+use std::collections::HashMap;
+
 // If persister_fail is true, we have the persister return a PermanentFailure
 // instead of the higher-level ChainMonitor.
 fn do_test_simple_monitor_permanent_update_fail(persister_fail: bool) {
@@ -1809,6 +1812,140 @@ fn monitor_update_claim_fail_no_response() {
        claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_2, 1_000_000);
 }
 
+#[test]
+fn test_chan_reload_discard_outbound_holding() {
+       // Test that when we reload a ChannelManager from disk we discard (by failing backwards)
+       // outbound HTLCs sitting in the holding cell. We currently assert that there are no holding
+       // cell outbound HTLCs when we reconnect to a peer, so this would otherwise fail a
+       // debug_assertion, but its also good hygiene - if we are sitting on an HTLC when we reload,
+       // its reasonable to assume its been a while, and, short of having some criteria based on the
+       // CLTV value, trying to forward it likely doesn't make sense.
+       // chanmon_fail_consistency found the debug_assertion failure.
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let new_chain_monitor;
+       let node_state_0;
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+       create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;
+       let logger = test_utils::TestLogger::new();
+
+       // Start forwarding a payment, skipping the first RAA so A is in AwaitingRAA
+       let (payment_preimage_1, payment_hash_1) = get_payment_preimage_hash!(nodes[0]);
+       {
+               let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
+               let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV, &logger).unwrap();
+               nodes[0].node.send_payment(&route, payment_hash_1, &None).unwrap();
+               check_added_monitors!(nodes[0], 1);
+       }
+
+       let mut events = nodes[0].node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), 1);
+       let payment_event = SendEvent::from_event(events.pop().unwrap());
+       nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
+       nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event.commitment_msg);
+       check_added_monitors!(nodes[1], 1);
+
+       let (bs_revoke_and_ack, bs_commitment_signed) = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
+
+       // Now forward a second payment, getting it stuck in A's outbound holding cell.
+       let (_, payment_hash_2) = get_payment_preimage_hash!(nodes[0]);
+       {
+               let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
+               let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV, &logger).unwrap();
+               nodes[0].node.send_payment(&route, payment_hash_2, &None).unwrap();
+               check_added_monitors!(nodes[0], 0);
+       }
+
+       let node_state = nodes[0].node.encode();
+       let mut chain_monitor_state = test_utils::TestVecWriter(Vec::new());
+       let funding_outpoint = *nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter().next().unwrap().0;
+       nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter().next().unwrap().1.serialize_for_disk(&mut chain_monitor_state).unwrap();
+
+       // Now if we pass the RAA back to A it should free the holding cell outbound HTLC.
+       nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_revoke_and_ack);
+       check_added_monitors!(nodes[0], 1);
+       events = nodes[0].node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), 1);
+       let payment_event = SendEvent::from_event(events.pop().unwrap());
+       assert_eq!(payment_event.msgs.len(), 1);
+
+       // Reload A's ChannelManager/Monitor and make sure the reload generates a PaymentFailed for the
+       // second payment.
+       let mut chain_monitor = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut ::std::io::Cursor::new(chain_monitor_state.0)).unwrap().1;
+       new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &nodes[0].logger, &node_cfgs[0].fee_estimator, &chanmon_cfgs[0].persister);
+       nodes[0].chain_monitor = &new_chain_monitor;
+       node_state_0 = {
+               let mut channel_monitors = HashMap::new();
+               channel_monitors.insert(funding_outpoint, &mut chain_monitor);
+               <(BlockHash, ChannelManager<EnforcingChannelKeys, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>::read(&mut ::std::io::Cursor::new(node_state), ChannelManagerReadArgs {
+                       keys_manager: &nodes[0].keys_manager,
+                       fee_estimator: &node_cfgs[0].fee_estimator,
+                       chain_monitor: &nodes[0].chain_monitor,
+                       logger: &nodes[0].logger,
+                       tx_broadcaster: &nodes[0].tx_broadcaster,
+                       default_config: UserConfig::default(),
+                       channel_monitors,
+               }).unwrap().1
+       };
+       nodes[0].node = &node_state_0;
+       assert!(nodes[0].chain_monitor.watch_channel(funding_outpoint, chain_monitor).is_ok());
+       check_added_monitors!(nodes[0], 1);
+
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match events[0] {
+               Event::PaymentFailed { ref payment_hash, rejected_by_dest, .. } => {
+                       assert_eq!(*payment_hash, payment_hash_2);
+                       assert!(!rejected_by_dest);
+               },
+               _ => panic!("Unexpected event"),
+       }
+
+       nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+
+       nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
+       nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
+
+       let node_0_reestablish = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReestablish, nodes[1].node.get_our_node_id());
+       let node_1_reestablish = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id());
+
+       nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &node_1_reestablish);
+       nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &node_0_reestablish);
+
+       assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
+
+       // Make sure nodes[1] rebroadcasts the undelivered messages:
+       let node_1_msgs = nodes[1].node.get_and_clear_pending_msg_events();
+       assert_eq!(node_1_msgs.len(), 2);
+       match node_1_msgs[0] {
+               MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
+                       assert_eq!(*node_id, nodes[0].node.get_our_node_id());
+                       assert!(*msg == bs_revoke_and_ack);
+               },
+               _ => panic!(),
+       }
+       match node_1_msgs[1] {
+               MessageSendEvent::UpdateHTLCs { ref node_id, ref updates } => {
+                       assert_eq!(*node_id, nodes[0].node.get_our_node_id());
+                       assert!(updates.commitment_signed == bs_commitment_signed);
+               },
+               _ => panic!(),
+       }
+
+       nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_revoke_and_ack);
+       check_added_monitors!(nodes[0], 1);
+       nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_commitment_signed);
+       check_added_monitors!(nodes[0], 1);
+
+       nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()));
+       check_added_monitors!(nodes[1], 1);
+       expect_pending_htlcs_forwardable!(nodes[1]);
+       expect_payment_received!(nodes[1], payment_hash_1, 1_000_000);
+
+       claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_1, 1_000_000);
+}
+
 // confirm_a_first and restore_b_before_conf are wholly unrelated to earlier bools and
 // restore_b_before_conf has no meaning if !confirm_a_first
 fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: bool) {
index c34fc6a38c28e5ae89a5cd98e094ee7a5348781a..7be1f104eaea0792ab51682a46f2a7341b0ef1d2 100644 (file)
@@ -4064,8 +4064,8 @@ impl Readable for InboundHTLCRemovalReason {
 
 impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
-               // Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been
-               // called but include holding cell updates (and obviously we don't modify self).
+               // Note that we write out as if remove_uncommitted_htlcs_and_mark_paused
+               // had just been called.
 
                writer.write_all(&[SERIALIZATION_VERSION; 1])?;
                writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
@@ -4156,13 +4156,10 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
                (self.holding_cell_htlc_updates.len() as u64).write(writer)?;
                for update in self.holding_cell_htlc_updates.iter() {
                        match update {
-                               &HTLCUpdateAwaitingACK::AddHTLC { ref amount_msat, ref cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet } => {
+                               &HTLCUpdateAwaitingACK::AddHTLC { ref payment_hash, ref source, .. } => {
                                        0u8.write(writer)?;
-                                       amount_msat.write(writer)?;
-                                       cltv_expiry.write(writer)?;
-                                       payment_hash.write(writer)?;
                                        source.write(writer)?;
-                                       onion_routing_packet.write(writer)?;
+                                       payment_hash.write(writer)?;
                                },
                                &HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, ref htlc_id } => {
                                        1u8.write(writer)?;
@@ -4248,7 +4245,7 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
        }
 }
 
-impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
+impl<ChanSigner: ChannelKeys + Readable> Readable for (Channel<ChanSigner>, Vec<(HTLCSource, PaymentHash)>) {
        fn read<R : ::std::io::Read>(reader: &mut R) -> Result<Self, DecodeError> {
                let _ver: u8 = Readable::read(reader)?;
                let min_ver: u8 = Readable::read(reader)?;
@@ -4312,27 +4309,22 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
                        });
                }
 
+               let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)> = Vec::new();
                let holding_cell_htlc_update_count: u64 = Readable::read(reader)?;
                let mut holding_cell_htlc_updates = Vec::with_capacity(cmp::min(holding_cell_htlc_update_count as usize, OUR_MAX_HTLCS as usize*2));
                for _ in 0..holding_cell_htlc_update_count {
-                       holding_cell_htlc_updates.push(match <u8 as Readable>::read(reader)? {
-                               0 => HTLCUpdateAwaitingACK::AddHTLC {
-                                       amount_msat: Readable::read(reader)?,
-                                       cltv_expiry: Readable::read(reader)?,
-                                       payment_hash: Readable::read(reader)?,
-                                       source: Readable::read(reader)?,
-                                       onion_routing_packet: Readable::read(reader)?,
-                               },
-                               1 => HTLCUpdateAwaitingACK::ClaimHTLC {
+                       match <u8 as Readable>::read(reader)? {
+                               0 => failed_htlcs.push((Readable::read(reader)?, Readable::read(reader)?)),
+                               1 => holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::ClaimHTLC {
                                        payment_preimage: Readable::read(reader)?,
                                        htlc_id: Readable::read(reader)?,
-                               },
-                               2 => HTLCUpdateAwaitingACK::FailHTLC {
+                               }),
+                               2 => holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::FailHTLC {
                                        htlc_id: Readable::read(reader)?,
                                        err_packet: Readable::read(reader)?,
-                               },
+                               }),
                                _ => return Err(DecodeError::InvalidValue),
-                       });
+                       }
                }
 
                let resend_order = match <u8 as Readable>::read(reader)? {
@@ -4398,7 +4390,7 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
                let counterparty_shutdown_scriptpubkey = Readable::read(reader)?;
                let commitment_secrets = Readable::read(reader)?;
 
-               Ok(Channel {
+               Ok((Channel {
                        user_id,
 
                        config,
@@ -4472,7 +4464,7 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
                        commitment_secrets,
 
                        network_sync: UpdateStatus::Fresh,
-               })
+               }, failed_htlcs))
        }
 }
 
index a884b5f72d870a173f2d27986522ce2394f136ad..b90f4e8e769cb426f95ef0ca8ce71b7fe049d0dd 100644 (file)
@@ -3871,14 +3871,20 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                let latest_block_height: u32 = Readable::read(reader)?;
                let last_block_hash: BlockHash = Readable::read(reader)?;
 
-               let mut failed_htlcs = Vec::new();
+               let mut perm_failed_htlcs = Vec::new();
+               let mut holding_cell_failed_htlcs = Vec::new();
 
                let channel_count: u64 = Readable::read(reader)?;
                let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128));
                let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                for _ in 0..channel_count {
-                       let mut channel: Channel<ChanSigner> = Readable::read(reader)?;
+                       let channel_and_failed_htlcs: (Channel<ChanSigner>, Vec<(HTLCSource, PaymentHash)>) = Readable::read(reader)?;
+                       let (mut channel, chan_failed_htlcs) = channel_and_failed_htlcs;
+                       for (_, ref payment_hash) in chan_failed_htlcs.iter() {
+                               log_trace!(args.logger, "Going to fail HTLC with hash {} which was pending-forwarding when we were serialized.", log_bytes!(&payment_hash.0[..]));
+                       }
+                       holding_cell_failed_htlcs.push((channel.channel_id(), chan_failed_htlcs));
                        if channel.last_block_connected != Default::default() && channel.last_block_connected != last_block_hash {
                                return Err(DecodeError::InvalidValue);
                        }
@@ -3898,7 +3904,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                                                channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() {
                                        // But if the channel is behind of the monitor, close the channel:
                                        let (_, _, mut new_failed_htlcs) = channel.force_shutdown(true);
-                                       failed_htlcs.append(&mut new_failed_htlcs);
+                                       perm_failed_htlcs.append(&mut new_failed_htlcs);
                                        monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
                                } else {
                                        if let Some(short_channel_id) = channel.get_short_channel_id() {
@@ -3993,9 +3999,12 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        default_configuration: args.default_config,
                };
 
-               for htlc_source in failed_htlcs.drain(..) {
+               for htlc_source in perm_failed_htlcs.drain(..) {
                        channel_manager.fail_htlc_backwards_internal(channel_manager.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
+               for (chan_id, htlcs) in holding_cell_failed_htlcs.drain(..) {
+                       channel_manager.fail_holding_cell_htlcs(htlcs, chan_id);
+               }
 
                //TODO: Broadcast channel update for closed channels, but only after we've made a
                //connection or two.