Add a test for MonitorEvent holding when they complete out-of-order
authorMatt Corallo <git@bluematt.me>
Tue, 19 Oct 2021 23:44:29 +0000 (23:44 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 6 Dec 2021 18:39:37 +0000 (18:39 +0000)
lightning/src/chain/chainmonitor.rs
lightning/src/util/test_utils.rs

index ccce772f8a1df927031a2e304e14dff52814283f..fc7e50a1cb26c2a8f3dff74b4d4f77f186fe148c 100644 (file)
@@ -728,15 +728,17 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
 #[cfg(test)]
 mod tests {
        use bitcoin::BlockHeader;
-       use ::{check_added_monitors, check_closed_broadcast, check_closed_event, expect_payment_sent};
-       use ::{get_local_commitment_txn, get_route_and_payment_hash, unwrap_send_err};
+       use ::{check_added_monitors, check_closed_broadcast, check_closed_event};
+       use ::{expect_payment_sent, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
+       use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
        use chain::{ChannelMonitorUpdateErr, Confirm, Watch};
        use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
        use ln::channelmanager::PaymentSendFailure;
        use ln::features::InitFeatures;
        use ln::functional_test_utils::*;
+       use ln::msgs::ChannelMessageHandler;
        use util::errors::APIError;
-       use util::events::{ClosureReason, MessageSendEventsProvider};
+       use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
        use util::test_utils::{OnRegisterOutput, TxOutReference};
 
        /// Tests that in-block dependent transactions are processed by `block_connected` when not
@@ -782,6 +784,81 @@ mod tests {
                nodes[1].node.get_and_clear_pending_events();
        }
 
+       #[test]
+       fn test_async_ooo_offchain_updates() {
+               // Test that if we have multiple offchain updates being persisted and they complete
+               // out-of-order, the ChainMonitor waits until all have completed before informing the
+               // ChannelManager.
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+               create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+
+               // Route two payments to be claimed at the same time.
+               let payment_preimage_1 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
+               let payment_preimage_2 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
+
+               chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
+               chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
+
+               nodes[1].node.claim_funds(payment_preimage_1);
+               check_added_monitors!(nodes[1], 1);
+               nodes[1].node.claim_funds(payment_preimage_2);
+               check_added_monitors!(nodes[1], 1);
+
+               chanmon_cfgs[1].persister.set_update_ret(Ok(()));
+
+               let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
+               assert_eq!(persistences.len(), 1);
+               let (funding_txo, updates) = persistences.iter().next().unwrap();
+               assert_eq!(updates.len(), 2);
+
+               // Note that updates is a HashMap so the ordering here is actually random. This shouldn't
+               // fail either way but if it fails intermittently it's depending on the ordering of updates.
+               let mut update_iter = updates.iter();
+               nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
+               assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
+               assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+               nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
+
+               // Now manually walk the commitment signed dance - because we claimed two payments
+               // back-to-back it doesn't fit into the neat walk commitment_signed_dance does.
+
+               let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
+               nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
+               expect_payment_sent_without_paths!(nodes[0], payment_preimage_1);
+               nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &updates.commitment_signed);
+               check_added_monitors!(nodes[0], 1);
+               let (as_first_raa, as_first_update) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
+
+               nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_first_raa);
+               check_added_monitors!(nodes[1], 1);
+               let bs_second_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
+               nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &as_first_update);
+               check_added_monitors!(nodes[1], 1);
+               let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
+
+               nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_second_updates.update_fulfill_htlcs[0]);
+               expect_payment_sent_without_paths!(nodes[0], payment_preimage_2);
+               nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_second_updates.commitment_signed);
+               check_added_monitors!(nodes[0], 1);
+               nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_first_raa);
+               expect_payment_path_successful!(nodes[0]);
+               check_added_monitors!(nodes[0], 1);
+               let (as_second_raa, as_second_update) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
+
+               nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_second_raa);
+               check_added_monitors!(nodes[1], 1);
+               nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &as_second_update);
+               check_added_monitors!(nodes[1], 1);
+               let bs_second_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
+
+               nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_second_raa);
+               expect_payment_path_successful!(nodes[0]);
+               check_added_monitors!(nodes[0], 1);
+       }
+
        fn do_chainsync_pauses_events(block_timeout: bool) {
                // When a chainsync monitor update occurs, any MonitorUpdates should be held before being
                // passed upstream to a `ChannelManager` via `Watch::release_pending_monitor_events`. This
index cc8c206980176bd030ca4de08645583fcb3e58d1..6442c9cfa27bd6b7b8a28efaf45008ab0d8cdd52 100644 (file)
@@ -172,6 +172,9 @@ pub struct TestPersister {
        /// When we get an update_persisted_channel call with no ChannelMonitorUpdate, we insert the
        /// MonitorUpdateId here.
        pub chain_sync_monitor_persistences: Mutex<HashMap<OutPoint, HashSet<MonitorUpdateId>>>,
+       /// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the
+       /// MonitorUpdateId here.
+       pub offchain_monitor_updates: Mutex<HashMap<OutPoint, HashSet<MonitorUpdateId>>>,
 }
 impl TestPersister {
        pub fn new() -> Self {
@@ -179,6 +182,7 @@ impl TestPersister {
                        update_ret: Mutex::new(Ok(())),
                        next_update_ret: Mutex::new(None),
                        chain_sync_monitor_persistences: Mutex::new(HashMap::new()),
+                       offchain_monitor_updates: Mutex::new(HashMap::new()),
                }
        }
 
@@ -206,6 +210,8 @@ impl<Signer: keysinterface::Sign> chainmonitor::Persist<Signer> for TestPersiste
                }
                if update.is_none() {
                        self.chain_sync_monitor_persistences.lock().unwrap().entry(funding_txo).or_insert(HashSet::new()).insert(update_id);
+               } else {
+                       self.offchain_monitor_updates.lock().unwrap().entry(funding_txo).or_insert(HashSet::new()).insert(update_id);
                }
                ret
        }