+/// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]`
+/// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass
+/// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by
+/// storing both old `ChannelMonitor`s and ones that are "being persisted" here.
+/// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will
+/// simply be replayed on startup.
+struct LatestMonitorState {
+ /// The latest monitor id which we told LDK we've persisted
+ persisted_monitor_id: u64,
+ /// The latest serialized `ChannelMonitor` that we told LDK we persisted.
+ persisted_monitor: Vec<u8>,
+ /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
+ /// from LDK's perspective.
+ pending_monitors: Vec<(u64, Vec<u8>)>,
struct TestChainMonitor {
pub logger: Arc<dyn Logger>,
pub keys: Arc<KeyProvider>,
pub persister: Arc<TestPersister>,
pub chain_monitor: Arc<chainmonitor::ChainMonitor<TestChannelSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
- // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
- // logic will automatically force-close our channels for us (as we don't have an up-to-date
- // monitor implying we are not able to punish misbehaving counterparties). Because this test
- // "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
- // fully-serialized monitor state here, as well as the corresponding update_id.
- pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
+ pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
impl TestChainMonitor {
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
let mut ser = VecWriter(Vec::new());
monitor.write(&mut ser).unwrap();
- if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
+ let monitor_id = monitor.get_latest_update_id();
+ let res = self.chain_monitor.watch_channel(funding_txo, monitor);
+ let state = match res {
+ Ok(chain::ChannelMonitorUpdateStatus::Completed) => {
+ LatestMonitorState {
+ persisted_monitor_id: monitor_id, persisted_monitor: ser.0,
+ pending_monitors: Vec::new(),
+ }
+ },
+ Ok(chain::ChannelMonitorUpdateStatus::InProgress) =>
+ panic!("The test currently doesn't test initial-persistence via the async pipeline"),
+ Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
+ Err(()) => panic!(),
+ };
+ if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
panic!("Already had monitor pre-watch_channel");
- self.chain_monitor.watch_channel(funding_txo, monitor)
+ res
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
let mut map_lock = self.latest_monitors.lock().unwrap();
let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
+ let latest_monitor_data = map_entry.pending_monitors.last().as_ref().map(|(_, data)| data).unwrap_or(&map_entry.persisted_monitor);
let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::
- read(&mut Cursor::new(&map_entry.1), (&*self.keys, &*self.keys)).unwrap().1;
+ read(&mut Cursor::new(&latest_monitor_data), (&*self.keys, &*self.keys)).unwrap().1;
deserialized_monitor.update_monitor(update, &&TestBroadcaster{}, &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
- *map_entry = (update.update_id, ser.0);
- self.chain_monitor.update_channel(funding_txo, update)
+ let res = self.chain_monitor.update_channel(funding_txo, update);
+ match res {
+ chain::ChannelMonitorUpdateStatus::Completed => {
+ map_entry.persisted_monitor_id = update.update_id;
+ map_entry.persisted_monitor = ser.0;
+ },
+ chain::ChannelMonitorUpdateStatus::InProgress => {
+ map_entry.pending_monitors.push((update.update_id, ser.0));
+ },
+ chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
+ }
+ res
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
let mut monitors = new_hash_map();
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
- for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
- monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(&mut Cursor::new(&monitor_ser), (&*$keys_manager, &*$keys_manager)).expect("Failed to read monitor").1);
- chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
+ for (outpoint, mut prev_state) in old_monitors.drain() {
+ monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
+ &mut Cursor::new(&prev_state.persisted_monitor), (&*$keys_manager, &*$keys_manager)
+ ).expect("Failed to read monitor").1);
+ // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
+ // considering them discarded. LDK should replay these for us as they're stored in
+ // the `ChannelManager`.
+ prev_state.pending_monitors.clear();
+ chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
let mut monitor_refs = new_hash_map();
for (outpoint, monitor) in monitors.iter_mut() {
} }
+ let complete_first = |v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None };
+ let complete_second = |v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None };
+ let complete_monitor_update = |
+ monitor: &Arc<TestChainMonitor>, chan_funding,
+ compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>,
+ | {
+ if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
+ assert!(
+ state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
+ "updates should be sorted by id"
+ );
+ if let Some((id, data)) = compl_selector(&mut state.pending_monitors) {
+ monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
+ if id > state.persisted_monitor_id {
+ state.persisted_monitor_id = id;
+ state.persisted_monitor = data;
+ }
+ }
+ }
+ };
+ let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_funding| {
+ if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
+ assert!(
+ state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
+ "updates should be sorted by id"
+ );
+ for (id, data) in state.pending_monitors.drain(..) {
+ monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
+ if id > state.persisted_monitor_id {
+ state.persisted_monitor_id = id;
+ state.persisted_monitor = data;
+ }
+ }
+ }
+ };
let v = get_slice!(1)[0];
out.locked_write(format!("READ A BYTE! HANDLING INPUT {:x}...........\n", v).as_bytes());
match v {
0x05 => *monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
0x06 => *monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
- 0x08 => {
- if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
- monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
- nodes[0].process_monitor_events();
- }
- },
- 0x09 => {
- if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
- monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
- nodes[1].process_monitor_events();
- }
- },
- 0x0a => {
- if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
- monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
- nodes[1].process_monitor_events();
- }
- },
- 0x0b => {
- if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
- monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
- nodes[2].process_monitor_events();
- }
- },
+ 0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_funding),
+ 0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_funding),
+ 0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_funding),
+ 0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_funding),
0x0c => {
if !chan_a_disconnected {
0x89 => { fee_est_c.ret_val.store(253, atomic::Ordering::Release); nodes[2].maybe_update_chan_fees(); },
- 0xf0 => {
- let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
- if let Some(id) = pending_updates.get(0) {
- monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
- }
- nodes[0].process_monitor_events();
- }
- 0xf1 => {
- let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
- if let Some(id) = pending_updates.get(1) {
- monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
- }
- nodes[0].process_monitor_events();
- }
- 0xf2 => {
- let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
- if let Some(id) = pending_updates.last() {
- monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
- }
- nodes[0].process_monitor_events();
- }
+ 0xf0 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_first),
+ 0xf1 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_second),
+ 0xf2 => complete_monitor_update(&monitor_a, &chan_1_funding, &Vec::pop),
- 0xf4 => {
- let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
- if let Some(id) = pending_updates.get(0) {
- monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
- }
- nodes[1].process_monitor_events();
- }
- 0xf5 => {
- let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
- if let Some(id) = pending_updates.get(1) {
- monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
- }
- nodes[1].process_monitor_events();
- }
- 0xf6 => {
- let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
- if let Some(id) = pending_updates.last() {
- monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
- }
- nodes[1].process_monitor_events();
- }
+ 0xf4 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_first),
+ 0xf5 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_second),
+ 0xf6 => complete_monitor_update(&monitor_b, &chan_1_funding, &Vec::pop),
- 0xf8 => {
- let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
- if let Some(id) = pending_updates.get(0) {
- monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
- }
- nodes[1].process_monitor_events();
- }
- 0xf9 => {
- let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
- if let Some(id) = pending_updates.get(1) {
- monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
- }
- nodes[1].process_monitor_events();
- }
- 0xfa => {
- let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
- if let Some(id) = pending_updates.last() {
- monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
- }
- nodes[1].process_monitor_events();
- }
+ 0xf8 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_first),
+ 0xf9 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_second),
+ 0xfa => complete_monitor_update(&monitor_b, &chan_2_funding, &Vec::pop),
- 0xfc => {
- let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
- if let Some(id) = pending_updates.get(0) {
- monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
- }
- nodes[2].process_monitor_events();
- }
- 0xfd => {
- let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
- if let Some(id) = pending_updates.get(1) {
- monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
- }
- nodes[2].process_monitor_events();
- }
- 0xfe => {
- let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
- if let Some(id) = pending_updates.last() {
- monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
- }
- nodes[2].process_monitor_events();
- }
+ 0xfc => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_first),
+ 0xfd => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_second),
+ 0xfe => complete_monitor_update(&monitor_c, &chan_2_funding, &Vec::pop),
0xff => {
// Test that no channel is in a stuck state where neither party can send funds even
// after we resolve all pending events.
- // First make sure there are no pending monitor updates, resetting the error state
- // and calling force_channel_monitor_updated for each monitor.
+ // First make sure there are no pending monitor updates and further update
+ // operations complete.
*monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
- if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
- monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
- nodes[0].process_monitor_events();
- }
- if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
- monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
- nodes[1].process_monitor_events();
- }
- if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
- monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
- nodes[1].process_monitor_events();
- }
- if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
- monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
- nodes[2].process_monitor_events();
- }
+ complete_all_monitor_updates(&monitor_a, &chan_1_funding);
+ complete_all_monitor_updates(&monitor_b, &chan_1_funding);
+ complete_all_monitor_updates(&monitor_b, &chan_2_funding);
+ complete_all_monitor_updates(&monitor_c, &chan_2_funding);
// Next, make sure peers are all connected to each other
if chan_a_disconnected {