]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Fuzz reloading with a stale monitor in chanmon_consistency
authorMatt Corallo <git@bluematt.me>
Mon, 15 Apr 2024 19:25:05 +0000 (19:25 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 12 Jun 2024 15:32:33 +0000 (15:32 +0000)
Now that we are gearing up to support fully async monitor storage,
we really need to fuzz monitor updates not completing before a
reload, which we do here in the `chanmon_consistency` fuzzer.

While there are more parts to async monitor updating that we need
to fuzz, this at least gets us started by having basic async
restart cases handled. In the future, we should extend this to make
sure some basic properties (eg claim/balance consistency) remain
true through `chanmon_consistency` runs.

fuzz/src/chanmon_consistency.rs

index 1cdf617fa078ba60c87efa971888b98201e1142d..a4b68ae1978d0e84ed885252c350a8599c233210 100644 (file)
@@ -142,17 +142,29 @@ impl Writer for VecWriter {
        }
 }
 
+/// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]`
+/// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass
+/// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by
+/// storing both old `ChannelMonitor`s and ones that are "being persisted" here.
+///
+/// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will
+/// simply be replayed on startup.
+struct LatestMonitorState {
+       /// The latest monitor id which we told LDK we've persisted
+       persisted_monitor_id: u64,
+       /// The latest serialized `ChannelMonitor` that we told LDK we persisted.
+       persisted_monitor: Vec<u8>,
+       /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
+       /// from LDK's perspective.
+       pending_monitors: Vec<(u64, Vec<u8>)>,
+}
+
 struct TestChainMonitor {
        pub logger: Arc<dyn Logger>,
        pub keys: Arc<KeyProvider>,
        pub persister: Arc<TestPersister>,
        pub chain_monitor: Arc<chainmonitor::ChainMonitor<TestChannelSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
-       // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
-       // logic will automatically force-close our channels for us (as we don't have an up-to-date
-       // monitor implying we are not able to punish misbehaving counterparties). Because this test
-       // "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
-       // fully-serialized monitor state here, as well as the corresponding update_id.
-       pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
+       pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
 }
 impl TestChainMonitor {
        pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
@@ -169,22 +181,47 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
        fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
                let mut ser = VecWriter(Vec::new());
                monitor.write(&mut ser).unwrap();
-               if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
+               let monitor_id = monitor.get_latest_update_id();
+               let res = self.chain_monitor.watch_channel(funding_txo, monitor);
+               let state = match res {
+                       Ok(chain::ChannelMonitorUpdateStatus::Completed) => {
+                               LatestMonitorState {
+                                       persisted_monitor_id: monitor_id, persisted_monitor: ser.0,
+                                       pending_monitors: Vec::new(),
+                               }
+                       },
+                       Ok(chain::ChannelMonitorUpdateStatus::InProgress) =>
+                               panic!("The test currently doesn't test initial-persistence via the async pipeline"),
+                       Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
+                       Err(()) => panic!(),
+               };
+               if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
                        panic!("Already had monitor pre-watch_channel");
                }
-               self.chain_monitor.watch_channel(funding_txo, monitor)
+               res
        }
 
        fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
                let mut map_lock = self.latest_monitors.lock().unwrap();
                let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
+               let latest_monitor_data = map_entry.pending_monitors.last().as_ref().map(|(_, data)| data).unwrap_or(&map_entry.persisted_monitor);
                let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::
-                       read(&mut Cursor::new(&map_entry.1), (&*self.keys, &*self.keys)).unwrap().1;
+                       read(&mut Cursor::new(&latest_monitor_data), (&*self.keys, &*self.keys)).unwrap().1;
                deserialized_monitor.update_monitor(update, &&TestBroadcaster{}, &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
                let mut ser = VecWriter(Vec::new());
                deserialized_monitor.write(&mut ser).unwrap();
-               *map_entry = (update.update_id, ser.0);
-               self.chain_monitor.update_channel(funding_txo, update)
+               let res = self.chain_monitor.update_channel(funding_txo, update);
+               match res {
+                       chain::ChannelMonitorUpdateStatus::Completed => {
+                               map_entry.persisted_monitor_id = update.update_id;
+                               map_entry.persisted_monitor = ser.0;
+                       },
+                       chain::ChannelMonitorUpdateStatus::InProgress => {
+                               map_entry.pending_monitors.push((update.update_id, ser.0));
+                       },
+                       chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
+               }
+               res
        }
 
        fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
@@ -511,9 +548,15 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
 
                        let mut monitors = new_hash_map();
                        let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
-                       for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
-                               monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(&mut Cursor::new(&monitor_ser), (&*$keys_manager, &*$keys_manager)).expect("Failed to read monitor").1);
-                               chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
+                       for (outpoint, mut prev_state) in old_monitors.drain() {
+                               monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
+                                       &mut Cursor::new(&prev_state.persisted_monitor), (&*$keys_manager, &*$keys_manager)
+                               ).expect("Failed to read monitor").1);
+                               // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
+                               // considering them discarded. LDK should replay these for us as they're stored in
+                               // the `ChannelManager`.
+                               prev_state.pending_monitors.clear();
+                               chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
                        }
                        let mut monitor_refs = new_hash_map();
                        for (outpoint, monitor) in monitors.iter_mut() {
@@ -1040,6 +1083,43 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
                        } }
                }
 
+               let complete_first = |v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None };
+               let complete_second = |v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None };
+               let complete_monitor_update = |
+                       monitor: &Arc<TestChainMonitor>, chan_funding,
+                       compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>,
+               | {
+                       if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
+                               assert!(
+                                       state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
+                                       "updates should be sorted by id"
+                               );
+                               if let Some((id, data)) = compl_selector(&mut state.pending_monitors) {
+                                       monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
+                                       if id > state.persisted_monitor_id {
+                                               state.persisted_monitor_id = id;
+                                               state.persisted_monitor = data;
+                                       }
+                               }
+                       }
+               };
+
+               let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_funding| {
+                       if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
+                               assert!(
+                                       state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
+                                       "updates should be sorted by id"
+                               );
+                               for (id, data) in state.pending_monitors.drain(..) {
+                                       monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
+                                       if id > state.persisted_monitor_id {
+                                               state.persisted_monitor_id = id;
+                                               state.persisted_monitor = data;
+                                       }
+                               }
+                       }
+               };
+
                let v = get_slice!(1)[0];
                out.locked_write(format!("READ A BYTE! HANDLING INPUT {:x}...........\n", v).as_bytes());
                match v {
@@ -1054,30 +1134,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
                        0x05 => *monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
                        0x06 => *monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
 
-                       0x08 => {
-                               if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
-                                       nodes[0].process_monitor_events();
-                               }
-                       },
-                       0x09 => {
-                               if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
-                                       nodes[1].process_monitor_events();
-                               }
-                       },
-                       0x0a => {
-                               if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
-                                       nodes[1].process_monitor_events();
-                               }
-                       },
-                       0x0b => {
-                               if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
-                                       nodes[2].process_monitor_events();
-                               }
-                       },
+                       0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_funding),
+                       0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_funding),
+                       0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_funding),
+                       0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_funding),
 
                        0x0c => {
                                if !chan_a_disconnected {
@@ -1285,119 +1345,35 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
                        },
                        0x89 => { fee_est_c.ret_val.store(253, atomic::Ordering::Release); nodes[2].maybe_update_chan_fees(); },
 
-                       0xf0 => {
-                               let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.get(0) {
-                                       monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[0].process_monitor_events();
-                       }
-                       0xf1 => {
-                               let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.get(1) {
-                                       monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[0].process_monitor_events();
-                       }
-                       0xf2 => {
-                               let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.last() {
-                                       monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[0].process_monitor_events();
-                       }
+                       0xf0 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_first),
+                       0xf1 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_second),
+                       0xf2 => complete_monitor_update(&monitor_a, &chan_1_funding, &Vec::pop),
 
-                       0xf4 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.get(0) {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-                       0xf5 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.get(1) {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-                       0xf6 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.last() {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
+                       0xf4 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_first),
+                       0xf5 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_second),
+                       0xf6 => complete_monitor_update(&monitor_b, &chan_1_funding, &Vec::pop),
 
-                       0xf8 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.get(0) {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-                       0xf9 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.get(1) {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-                       0xfa => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.last() {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
+                       0xf8 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_first),
+                       0xf9 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_second),
+                       0xfa => complete_monitor_update(&monitor_b, &chan_2_funding, &Vec::pop),
 
-                       0xfc => {
-                               let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.get(0) {
-                                       monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[2].process_monitor_events();
-                       }
-                       0xfd => {
-                               let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.get(1) {
-                                       monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[2].process_monitor_events();
-                       }
-                       0xfe => {
-                               let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.last() {
-                                       monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[2].process_monitor_events();
-                       }
+                       0xfc => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_first),
+                       0xfd => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_second),
+                       0xfe => complete_monitor_update(&monitor_c, &chan_2_funding, &Vec::pop),
 
                        0xff => {
                                // Test that no channel is in a stuck state where neither party can send funds even
                                // after we resolve all pending events.
-                               // First make sure there are no pending monitor updates, resetting the error state
-                               // and calling force_channel_monitor_updated for each monitor.
+                               // First make sure there are no pending monitor updates and further update
+                               // operations complete.
                                *monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
                                *monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
                                *monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
 
-                               if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
-                                       nodes[0].process_monitor_events();
-                               }
-                               if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
-                                       nodes[1].process_monitor_events();
-                               }
-                               if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
-                                       nodes[1].process_monitor_events();
-                               }
-                               if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
-                                       nodes[2].process_monitor_events();
-                               }
+                               complete_all_monitor_updates(&monitor_a, &chan_1_funding);
+                               complete_all_monitor_updates(&monitor_b, &chan_1_funding);
+                               complete_all_monitor_updates(&monitor_b, &chan_2_funding);
+                               complete_all_monitor_updates(&monitor_c, &chan_2_funding);
 
                                // Next, make sure peers are all connected to each other
                                if chan_a_disconnected {