Regenerate PendingHTLCsForwardable on reload instead of serializing 2021-09-forwardable-regen
authorMatt Corallo <git@bluematt.me>
Wed, 15 Sep 2021 19:20:44 +0000 (19:20 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 29 Sep 2021 19:20:38 +0000 (19:20 +0000)
When we are prepared to forward HTLCs, we generate a
PendingHTLCsForwardable event with a time in the future when the
user should tell us to forward. This provides some basic batching
of forward events, improving privacy slightly.

After we generate the event, we expect users to spawn a timer in
the background and let us know when it finishes. However, if the
user shuts down before the timer fires, the user will restart and
have no idea that HTLCs are waiting to be forwarded/received.

To fix this, instead of serializing PendingHTLCsForwardable events
to disk while they're pending (before the user starts the timer),
we simply regenerate them when a ChannelManager is deserialized
with HTLCs pending.

Fixes #1042

lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_tests.rs
lightning/src/util/events.rs

index b772c5e9ac75789f0876e2664bb87fdf2c8129a4..49a6bd0a427284c889f950e0fcde677706f2d72e 100644 (file)
@@ -5276,6 +5276,16 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                None => continue,
                        }
                }
+               if forward_htlcs_count > 0 {
+                       // If we have pending HTLCs to forward, assume we either dropped a
+                       // `PendingHTLCsForwardable` or the user received it but never processed it as they
+                       // shut down before the timer hit. Either way, set the time_forwardable to a small
+                       // constant as enough time has likely passed that we should simply handle the forwards
+                       // now, or at least after the user gets a chance to reconnect to our peers.
+                       pending_events_read.push(events::Event::PendingHTLCsForwardable {
+                               time_forwardable: Duration::from_secs(2),
+                       });
+               }
 
                let background_event_count: u64 = Readable::read(reader)?;
                let mut pending_background_events_read: Vec<BackgroundEvent> = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<BackgroundEvent>()));
index e1a57cd7cc1533e084c955a69d6b3820043fa8a7..3c2a61e3aa2fa6ec8aa56a34506d2723d0f788e6 100644 (file)
@@ -9014,6 +9014,125 @@ fn test_tx_confirmed_skipping_blocks_immediate_broadcast() {
        do_test_tx_confirmed_skipping_blocks_immediate_broadcast(true);
 }
 
+#[test]
+fn test_forwardable_regen() {
+       // Tests that if we reload a ChannelManager while forwards are pending we will regenerate the
+       // PendingHTLCsForwardable event automatically, ensuring we don't forget to forward/receive
+       // HTLCs.
+       // We test it for both payment receipt and payment forwarding.
+
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let persister: test_utils::TestPersister;
+       let new_chain_monitor: test_utils::TestChainMonitor;
+       let nodes_1_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
+       let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+       create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+       create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known());
+
+       // First send a payment to nodes[1]
+       let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000);
+       nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret)).unwrap();
+       check_added_monitors!(nodes[0], 1);
+
+       let mut events = nodes[0].node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), 1);
+       let payment_event = SendEvent::from_event(events.pop().unwrap());
+       nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
+       commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false);
+
+       expect_pending_htlcs_forwardable_ignore!(nodes[1]);
+
+       // Next send a payment which is forwarded by nodes[1]
+       let (route_2, payment_hash_2, payment_preimage_2, payment_secret_2) = get_route_and_payment_hash!(nodes[0], nodes[2], 200_000);
+       nodes[0].node.send_payment(&route_2, payment_hash_2, &Some(payment_secret_2)).unwrap();
+       check_added_monitors!(nodes[0], 1);
+
+       let mut events = nodes[0].node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), 1);
+       let payment_event = SendEvent::from_event(events.pop().unwrap());
+       nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
+       commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false);
+
+       // There is already a PendingHTLCsForwardable event "pending" so another one will not be
+       // generated
+       assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
+
+       // Now restart nodes[1] and make sure it regenerates a single PendingHTLCsForwardable
+       nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+       nodes[2].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+
+       let nodes_1_serialized = nodes[1].node.encode();
+       let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
+       let mut chan_1_monitor_serialized = test_utils::TestVecWriter(Vec::new());
+       {
+               let monitors = nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap();
+               let mut monitor_iter = monitors.iter();
+               monitor_iter.next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
+               monitor_iter.next().unwrap().1.write(&mut chan_1_monitor_serialized).unwrap();
+       }
+
+       persister = test_utils::TestPersister::new();
+       let keys_manager = &chanmon_cfgs[1].keys_manager;
+       new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[1].chain_source), nodes[1].tx_broadcaster.clone(), nodes[1].logger, node_cfgs[1].fee_estimator, &persister, keys_manager);
+       nodes[1].chain_monitor = &new_chain_monitor;
+
+       let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
+       let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
+               &mut chan_0_monitor_read, keys_manager).unwrap();
+       assert!(chan_0_monitor_read.is_empty());
+       let mut chan_1_monitor_read = &chan_1_monitor_serialized.0[..];
+       let (_, mut chan_1_monitor) = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
+               &mut chan_1_monitor_read, keys_manager).unwrap();
+       assert!(chan_1_monitor_read.is_empty());
+
+       let mut nodes_1_read = &nodes_1_serialized[..];
+       let (_, nodes_1_deserialized_tmp) = {
+               let mut channel_monitors = HashMap::new();
+               channel_monitors.insert(chan_0_monitor.get_funding_txo().0, &mut chan_0_monitor);
+               channel_monitors.insert(chan_1_monitor.get_funding_txo().0, &mut chan_1_monitor);
+               <(BlockHash, ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>::read(&mut nodes_1_read, ChannelManagerReadArgs {
+                       default_config: UserConfig::default(),
+                       keys_manager,
+                       fee_estimator: node_cfgs[1].fee_estimator,
+                       chain_monitor: nodes[1].chain_monitor,
+                       tx_broadcaster: nodes[1].tx_broadcaster.clone(),
+                       logger: nodes[1].logger,
+                       channel_monitors,
+               }).unwrap()
+       };
+       nodes_1_deserialized = nodes_1_deserialized_tmp;
+       assert!(nodes_1_read.is_empty());
+
+       assert!(nodes[1].chain_monitor.watch_channel(chan_0_monitor.get_funding_txo().0, chan_0_monitor).is_ok());
+       assert!(nodes[1].chain_monitor.watch_channel(chan_1_monitor.get_funding_txo().0, chan_1_monitor).is_ok());
+       nodes[1].node = &nodes_1_deserialized;
+       check_added_monitors!(nodes[1], 2);
+
+       reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
+       // Note that nodes[1] and nodes[2] resend their funding_locked here since they haven't updated
+       // the commitment state.
+       reconnect_nodes(&nodes[1], &nodes[2], (true, true), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
+
+       assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+
+       expect_pending_htlcs_forwardable!(nodes[1]);
+       expect_payment_received!(nodes[1], payment_hash, payment_secret, 100_000);
+       check_added_monitors!(nodes[1], 1);
+
+       let mut events = nodes[1].node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), 1);
+       let payment_event = SendEvent::from_event(events.pop().unwrap());
+       nodes[2].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &payment_event.msgs[0]);
+       commitment_signed_dance!(nodes[2], nodes[1], payment_event.commitment_msg, false);
+       expect_pending_htlcs_forwardable!(nodes[2]);
+       expect_payment_received!(nodes[2], payment_hash_2, payment_secret_2, 200_000);
+
+       claim_payment(&nodes[0], &[&nodes[1]], payment_preimage);
+       claim_payment(&nodes[0], &[&nodes[1], &nodes[2]], payment_preimage_2);
+}
+
 #[test]
 fn test_keysend_payments_to_public_node() {
        let chanmon_cfgs = create_chanmon_cfgs(2);
index d63dd88b76163c86fd16410e468eb03f7d95fcc6..7436f3c7e2044369ea97b879f1fbdb6bf613b41d 100644 (file)
@@ -240,9 +240,8 @@ impl Writeable for Event {
                        },
                        &Event::PendingHTLCsForwardable { time_forwardable: _ } => {
                                4u8.write(writer)?;
-                               write_tlv_fields!(writer, {});
-                               // We don't write the time_fordwardable out at all, as we presume when the user
-                               // deserializes us at least that much time has elapsed.
+                               // Note that we now ignore these on the read end as we'll re-generate them in
+                               // ChannelManager, we write them here only for backwards compatibility.
                        },
                        &Event::SpendableOutputs { ref outputs } => {
                                5u8.write(writer)?;
@@ -336,15 +335,7 @@ impl MaybeReadable for Event {
                                };
                                f()
                        },
-                       4u8 => {
-                               let f = || {
-                                       read_tlv_fields!(reader, {});
-                                       Ok(Some(Event::PendingHTLCsForwardable {
-                                               time_forwardable: Duration::from_secs(0)
-                                       }))
-                               };
-                               f()
-                       },
+                       4u8 => Ok(None),
                        5u8 => {
                                let f = || {
                                        let mut outputs = VecReadWrapper(Vec::new());