Fuzz reloading with a stale monitor in chanmon_consistency 2024-04-async-monitor-fuzz
authorMatt Corallo <git@bluematt.me>
Mon, 15 Apr 2024 19:25:05 +0000 (19:25 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 15 Apr 2024 21:07:52 +0000 (21:07 +0000)
Now that we are gearing up to support fully async monitor storage,
we really need to fuzz monitor updates not completing before a
reload, which we do here in the `chanmon_consistency` fuzzer.

fuzz/src/chanmon_consistency.rs

index 36e7cea8a2215e1414af4ef0f46a082401732c03..44fd0249c0d094bcf0ffbbb03dc77cb566b8dd0d 100644 (file)
@@ -138,6 +138,12 @@ impl Writer for VecWriter {
        }
 }
 
+struct LatestMonitorState {
+       persisted_monitor_id: u64,
+       persisted_monitor: Vec<u8>,
+       pending_monitor_updates: Vec<(u64, Vec<u8>)>,
+}
+
 struct TestChainMonitor {
        pub logger: Arc<dyn Logger>,
        pub keys: Arc<KeyProvider>,
@@ -148,7 +154,10 @@ struct TestChainMonitor {
        // monitor implying we are not able to punish misbehaving counterparties). Because this test
        // "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
        // fully-serialized monitor state here, as well as the corresponding update_id.
-       pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
+       //
+       // Note that this doesn't apply to monitors which are pending persistence, so we store the
+       // latest pending monitor separately.
+       pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
 }
 impl TestChainMonitor {
        pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
@@ -165,22 +174,48 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
        fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
                let mut ser = VecWriter(Vec::new());
                monitor.write(&mut ser).unwrap();
-               if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
+               let monitor_id = monitor.get_latest_update_id();
+               let res = self.chain_monitor.watch_channel(funding_txo, monitor);
+               let state = match res {
+                       Ok(chain::ChannelMonitorUpdateStatus::Completed) => {
+                               LatestMonitorState {
+                                       persisted_monitor_id: monitor_id, persisted_monitor: ser.0,
+                                       pending_monitor_updates: Vec::new(),
+                               }
+                       },
+                       Ok(chain::ChannelMonitorUpdateStatus::InProgress) =>
+                               panic!("The test currently doesn't test initial-persistence via the async pipeline"),
+                       Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
+                       Err(()) => panic!(),
+               };
+               if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
                        panic!("Already had monitor pre-watch_channel");
                }
-               self.chain_monitor.watch_channel(funding_txo, monitor)
+               res
        }
 
        fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
                let mut map_lock = self.latest_monitors.lock().unwrap();
                let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
+               let latest_monitor_data = map_entry.pending_monitor_updates.last().as_ref().map(|(_, data)| data).unwrap_or(&map_entry.persisted_monitor);
                let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::
-                       read(&mut Cursor::new(&map_entry.1), (&*self.keys, &*self.keys)).unwrap().1;
+                       read(&mut Cursor::new(&latest_monitor_data), (&*self.keys, &*self.keys)).unwrap().1;
                deserialized_monitor.update_monitor(update, &&TestBroadcaster{}, &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
                let mut ser = VecWriter(Vec::new());
                deserialized_monitor.write(&mut ser).unwrap();
-               *map_entry = (update.update_id, ser.0);
-               self.chain_monitor.update_channel(funding_txo, update)
+               let res = self.chain_monitor.update_channel(funding_txo, update);
+               match res {
+                       chain::ChannelMonitorUpdateStatus::Completed => {
+                               map_entry.pending_monitor_updates.clear();
+                               map_entry.persisted_monitor_id = update.update_id;
+                               map_entry.persisted_monitor = ser.0;
+                       },
+                       chain::ChannelMonitorUpdateStatus::InProgress => {
+                               map_entry.pending_monitor_updates.push((update.update_id, ser.0));
+                       },
+                       chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
+               }
+               res
        }
 
        fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
@@ -507,9 +542,12 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
 
                        let mut monitors = new_hash_map();
                        let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
-                       for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
-                               monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(&mut Cursor::new(&monitor_ser), (&*$keys_manager, &*$keys_manager)).expect("Failed to read monitor").1);
-                               chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
+                       for (outpoint, mut prev_state) in old_monitors.drain() {
+                               monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
+                                       &mut Cursor::new(&prev_state.persisted_monitor), (&*$keys_manager, &*$keys_manager)
+                               ).expect("Failed to read monitor").1);
+                               prev_state.pending_monitor_updates.clear();
+                               chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
                        }
                        let mut monitor_refs = new_hash_map();
                        for (outpoint, monitor) in monitors.iter_mut() {
@@ -1036,6 +1074,42 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
                        } }
                }
 
+               let complete_monitor_update = |
+                       node: &ChannelManager<_, _, _, _, _, _, _, _>, monitor: &Arc<TestChainMonitor>,
+                       chan_funding, compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>,
+               | {
+                       if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
+                               assert!(
+                                       state.pending_monitor_updates.windows(2).all(|pair| pair[0].0 < pair[1].0),
+                                       "updates should be sorted by id"
+                               );
+                               if let Some((id, data)) = compl_selector(&mut state.pending_monitor_updates) {
+                                       monitor.chain_monitor.channel_monitor_updated_work_around_shitty_api(*chan_funding, id).unwrap();
+                                       if id > state.persisted_monitor_id {
+                                               state.persisted_monitor_id = id;
+                                               state.persisted_monitor = data;
+                                       }
+                                       node.process_monitor_events();
+                               }
+                       }
+               };
+
+               let complete_all_monitor_updates = |node: &ChannelManager<_, _, _, _, _, _, _, _>, monitor: &Arc<TestChainMonitor>, chan_funding| {
+                       if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
+                               assert!(
+                                       state.pending_monitor_updates.windows(2).all(|pair| pair[0].0 < pair[1].0),
+                                       "updates should be sorted by id"
+                               );
+                               if let Some((id, data)) = state.pending_monitor_updates.pop() {
+                                       monitor.chain_monitor.force_channel_monitor_updated(*chan_funding, id);
+                                       state.persisted_monitor_id = id;
+                                       state.persisted_monitor = data;
+                                       node.process_monitor_events();
+                               }
+                               state.pending_monitor_updates.clear();
+                       }
+               };
+
                let v = get_slice!(1)[0];
                out.locked_write(format!("READ A BYTE! HANDLING INPUT {:x}...........\n", v).as_bytes());
                match v {
@@ -1050,30 +1124,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
                        0x05 => *monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
                        0x06 => *monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
 
-                       0x08 => {
-                               if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
-                                       nodes[0].process_monitor_events();
-                               }
-                       },
-                       0x09 => {
-                               if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
-                                       nodes[1].process_monitor_events();
-                               }
-                       },
-                       0x0a => {
-                               if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
-                                       nodes[1].process_monitor_events();
-                               }
-                       },
-                       0x0b => {
-                               if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
-                                       nodes[2].process_monitor_events();
-                               }
-                       },
+                       0x08 => complete_all_monitor_updates(&nodes[0], &monitor_a, &chan_1_funding),
+                       0x09 => complete_all_monitor_updates(&nodes[1], &monitor_b, &chan_1_funding),
+                       0x0a => complete_all_monitor_updates(&nodes[1], &monitor_b, &chan_2_funding),
+                       0x0b => complete_all_monitor_updates(&nodes[2], &monitor_c, &chan_2_funding),
 
                        0x0c => {
                                if !chan_a_disconnected {
@@ -1281,93 +1335,45 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
                        },
                        0x89 => { fee_est_c.ret_val.store(253, atomic::Ordering::Release); nodes[2].maybe_update_chan_fees(); },
 
-                       0xf0 => {
-                               let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.get(0) {
-                                       monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[0].process_monitor_events();
-                       }
-                       0xf1 => {
-                               let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.get(1) {
-                                       monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[0].process_monitor_events();
-                       }
-                       0xf2 => {
-                               let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.last() {
-                                       monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[0].process_monitor_events();
-                       }
-
-                       0xf4 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.get(0) {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-                       0xf5 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.get(1) {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-                       0xf6 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
-                               if let Some(id) = pending_updates.last() {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-
-                       0xf8 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.get(0) {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-                       0xf9 => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.get(1) {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-                       0xfa => {
-                               let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.last() {
-                                       monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[1].process_monitor_events();
-                       }
-
-                       0xfc => {
-                               let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.get(0) {
-                                       monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[2].process_monitor_events();
-                       }
-                       0xfd => {
-                               let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.get(1) {
-                                       monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[2].process_monitor_events();
-                       }
-                       0xfe => {
-                               let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
-                               if let Some(id) = pending_updates.last() {
-                                       monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
-                               }
-                               nodes[2].process_monitor_events();
-                       }
+                       0xf0 =>
+                               complete_monitor_update(&nodes[0], &monitor_a, &chan_1_funding,
+                                       &|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
+                       0xf1 =>
+                               complete_monitor_update(&nodes[0], &monitor_a, &chan_1_funding,
+                                       &|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
+                       0xf2 =>
+                               complete_monitor_update(&nodes[0], &monitor_a, &chan_1_funding,
+                                       &|v: &mut Vec<_>| v.pop()),
+
+                       0xf4 =>
+                               complete_monitor_update(&nodes[1], &monitor_b, &chan_1_funding,
+                                       &|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
+                       0xf5 =>
+                               complete_monitor_update(&nodes[1], &monitor_b, &chan_1_funding,
+                                       &|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
+                       0xf6 =>
+                               complete_monitor_update(&nodes[1], &monitor_b, &chan_1_funding,
+                                       &|v: &mut Vec<_>| v.pop()),
+
+                       0xf8 =>
+                               complete_monitor_update(&nodes[1], &monitor_b, &chan_2_funding,
+                                       &|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
+                       0xf9 =>
+                               complete_monitor_update(&nodes[1], &monitor_b, &chan_2_funding,
+                                       &|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
+                       0xfa =>
+                               complete_monitor_update(&nodes[1], &monitor_b, &chan_2_funding,
+                                       &|v: &mut Vec<_>| v.pop()),
+
+                       0xfc =>
+                               complete_monitor_update(&nodes[2], &monitor_c, &chan_2_funding,
+                                       &|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
+                       0xfd =>
+                               complete_monitor_update(&nodes[2], &monitor_c, &chan_2_funding,
+                                       &|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
+                       0xfe =>
+                               complete_monitor_update(&nodes[2], &monitor_c, &chan_2_funding,
+                                       &|v: &mut Vec<_>| v.pop()),
 
                        0xff => {
                                // Test that no channel is in a stuck state where neither party can send funds even
@@ -1378,22 +1384,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
                                *monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
                                *monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
 
-                               if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
-                                       nodes[0].process_monitor_events();
-                               }
-                               if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
-                                       nodes[1].process_monitor_events();
-                               }
-                               if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
-                                       nodes[1].process_monitor_events();
-                               }
-                               if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
-                                       nodes[2].process_monitor_events();
-                               }
+                               complete_all_monitor_updates(&nodes[0], &monitor_a, &chan_1_funding);
+                               complete_all_monitor_updates(&nodes[1], &monitor_b, &chan_1_funding);
+                               complete_all_monitor_updates(&nodes[1], &monitor_b, &chan_2_funding);
+                               complete_all_monitor_updates(&nodes[2], &monitor_c, &chan_2_funding);
 
                                // Next, make sure peers are all connected to each other
                                if chan_a_disconnected {