Flatten onchain_events_waiting_threshold_conf
authorJeffrey Czyz <jkczyz@gmail.com>
Wed, 31 Mar 2021 17:23:57 +0000 (13:23 -0400)
committerJeffrey Czyz <jkczyz@gmail.com>
Wed, 14 Apr 2021 19:57:04 +0000 (12:57 -0700)
Rather than mapping height to a vector of events, use a single vector
for all events. This allows for easily processing events by either
height or transaction. The latter will be used for an interface suitable
for Electrum.

lightning/src/chain/channelmonitor.rs
lightning/src/ln/onchaintx.rs

index 939337d7b42d572d64c7ab5b1a6224479f6c1ce9..51462b6dd4edcbc9c2fb857b7dc78f78179abb48 100644 (file)
@@ -50,7 +50,7 @@ use util::ser::{Readable, ReadableArgs, MaybeReadable, Writer, Writeable, U48};
 use util::byte_utils;
 use util::events::Event;
 
-use std::collections::{HashMap, HashSet, hash_map};
+use std::collections::{HashMap, HashSet};
 use std::{cmp, mem};
 use std::io::Error;
 use std::ops::Deref;
@@ -465,9 +465,28 @@ pub(crate) struct ClaimRequest {
        pub(crate) witness_data: InputMaterial
 }
 
+/// An entry for an [`OnchainEvent`], stating the block height when the event was observed.
+///
+/// Used to determine when the on-chain event can be considered safe from a chain reorganization.
+#[derive(PartialEq)]
+struct OnchainEventEntry {
+       height: u32,
+       event: OnchainEvent,
+}
+
+impl OnchainEventEntry {
+       fn confirmation_threshold(&self) -> u32 {
+               self.height + ANTI_REORG_DELAY - 1
+       }
+
+       fn has_reached_confirmation_threshold(&self, height: u32) -> bool {
+               self.confirmation_threshold() == height
+       }
+}
+
 /// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it
 /// once they mature to enough confirmations (ANTI_REORG_DELAY)
-#[derive(Clone, PartialEq)]
+#[derive(PartialEq)]
 enum OnchainEvent {
        /// HTLC output getting solved by a timeout, at maturation we pass upstream payment source information to solve
        /// inbound HTLC in backward channel. Note, in case of preimage, we pass info to upstream without delay as we can
@@ -684,10 +703,10 @@ pub(crate) struct ChannelMonitorImpl<Signer: Sign> {
        pending_monitor_events: Vec<MonitorEvent>,
        pending_events: Vec<Event>,
 
-       // Used to track onchain events, i.e transactions parts of channels confirmed on chain, on which
-       // we have to take actions once they reach enough confs. Key is a block height timer, i.e we enforce
-       // actions when we receive a block with given height. Actions depend on OnchainEvent type.
-       onchain_events_waiting_threshold_conf: HashMap<u32, Vec<OnchainEvent>>,
+       // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
+       // which to take actions once they reach enough confirmations. Each entry includes the
+       // transaction's id and the height when the transaction was confirmed on chain.
+       onchain_events_waiting_threshold_conf: Vec<OnchainEventEntry>,
 
        // If we get serialized out and re-read, we need to make sure that the chain monitoring
        // interface knows about the TXOs that we want to be notified of spends of. We could probably
@@ -934,21 +953,18 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
                self.last_block_hash.write(writer)?;
 
                writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?;
-               for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() {
-                       writer.write_all(&byte_utils::be32_to_array(**target))?;
-                       writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?;
-                       for ev in events.iter() {
-                               match *ev {
-                                       OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                               0u8.write(writer)?;
-                                               htlc_update.0.write(writer)?;
-                                               htlc_update.1.write(writer)?;
-                                       },
-                                       OnchainEvent::MaturingOutput { ref descriptor } => {
-                                               1u8.write(writer)?;
-                                               descriptor.write(writer)?;
-                                       },
-                               }
+               for ref entry in self.onchain_events_waiting_threshold_conf.iter() {
+                       writer.write_all(&byte_utils::be32_to_array(entry.height))?;
+                       match entry.event {
+                               OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                       0u8.write(writer)?;
+                                       htlc_update.0.write(writer)?;
+                                       htlc_update.1.write(writer)?;
+                               },
+                               OnchainEvent::MaturingOutput { ref descriptor } => {
+                                       1u8.write(writer)?;
+                                       descriptor.write(writer)?;
+                               },
                        }
                }
 
@@ -1056,7 +1072,7 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
                                pending_monitor_events: Vec::new(),
                                pending_events: Vec::new(),
 
-                               onchain_events_waiting_threshold_conf: HashMap::new(),
+                               onchain_events_waiting_threshold_conf: Vec::new(),
                                outputs_to_watch,
 
                                onchain_tx_handler,
@@ -1639,24 +1655,23 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                                if let Some(ref outpoints) = self.counterparty_claimable_outpoints.get($txid) {
                                                        for &(ref htlc, ref source_option) in outpoints.iter() {
                                                                if let &Some(ref source) = source_option {
-                                                                       log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
-                                                                       match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                                                               hash_map::Entry::Occupied(mut entry) => {
-                                                                                       let e = entry.get_mut();
-                                                                                       e.retain(|ref event| {
-                                                                                               match **event {
-                                                                                                       OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                                                                                               return htlc_update.0 != **source
-                                                                                                       },
-                                                                                                       _ => true
-                                                                                               }
-                                                                                       });
-                                                                                       e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
-                                                                               }
-                                                                               hash_map::Entry::Vacant(entry) => {
-                                                                                       entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
+                                                                       self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
+                                                                               if entry.height != height { return true; }
+                                                                               match entry.event {
+                                                                                        OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                                                                                htlc_update.0 != **source
+                                                                                        },
+                                                                                        _ => true,
                                                                                }
-                                                                       }
+                                                                       });
+                                                                       let entry = OnchainEventEntry {
+                                                                               height,
+                                                                               event: OnchainEvent::HTLCUpdate {
+                                                                                       htlc_update: ((**source).clone(), htlc.payment_hash.clone())
+                                                                               },
+                                                                       };
+                                                                       log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold());
+                                                                       self.onchain_events_waiting_threshold_conf.push(entry);
                                                                }
                                                        }
                                                }
@@ -1705,23 +1720,21 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                                                        }
                                                                }
                                                                log_trace!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of counterparty commitment transaction", log_bytes!(htlc.payment_hash.0), $commitment_tx);
-                                                               match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                                                       hash_map::Entry::Occupied(mut entry) => {
-                                                                               let e = entry.get_mut();
-                                                                               e.retain(|ref event| {
-                                                                                       match **event {
-                                                                                               OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                                                                                       return htlc_update.0 != **source
-                                                                                               },
-                                                                                               _ => true
-                                                                                       }
-                                                                               });
-                                                                               e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
-                                                                       }
-                                                                       hash_map::Entry::Vacant(entry) => {
-                                                                               entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
+                                                               self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
+                                                                       if entry.height != height { return true; }
+                                                                       match entry.event {
+                                                                                OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                                                                        htlc_update.0 != **source
+                                                                                },
+                                                                                _ => true,
                                                                        }
-                                                               }
+                                                               });
+                                                               self.onchain_events_waiting_threshold_conf.push(OnchainEventEntry {
+                                                                       height,
+                                                                       event: OnchainEvent::HTLCUpdate {
+                                                                               htlc_update: ((**source).clone(), htlc.payment_hash.clone())
+                                                                       },
+                                                               });
                                                        }
                                                }
                                        }
@@ -1862,24 +1875,21 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
 
                macro_rules! wait_threshold_conf {
                        ($height: expr, $source: expr, $commitment_tx: expr, $payment_hash: expr) => {
-                               log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
-                               match self.onchain_events_waiting_threshold_conf.entry($height + ANTI_REORG_DELAY - 1) {
-                                       hash_map::Entry::Occupied(mut entry) => {
-                                               let e = entry.get_mut();
-                                               e.retain(|ref event| {
-                                                       match **event {
-                                                               OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                                                       return htlc_update.0 != $source
-                                                               },
-                                                               _ => true
-                                                       }
-                                               });
-                                               e.push(OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)});
+                               self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
+                                       if entry.height != $height { return true; }
+                                       match entry.event {
+                                                OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                                        htlc_update.0 != $source
+                                                },
+                                                _ => true,
                                        }
-                                       hash_map::Entry::Vacant(entry) => {
-                                               entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)}]);
-                                       }
-                               }
+                               });
+                               let entry = OnchainEventEntry {
+                                       height,
+                                       event: OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash) },
+                               };
+                               log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, entry.confirmation_threshold());
+                               self.onchain_events_waiting_threshold_conf.push(entry);
                        }
                }
 
@@ -2054,9 +2064,12 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                        }
                        claimable_outpoints.append(&mut new_outpoints);
                }
-               if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) {
-                       for ev in events {
-                               match ev {
+
+               let onchain_events_waiting_threshold_conf =
+                       self.onchain_events_waiting_threshold_conf.drain(..).collect::<Vec<_>>();
+               for entry in onchain_events_waiting_threshold_conf {
+                       if entry.has_reached_confirmation_threshold(height) {
+                               match entry.event {
                                        OnchainEvent::HTLCUpdate { htlc_update } => {
                                                log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
                                                self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
@@ -2072,6 +2085,8 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                                });
                                        }
                                }
+                       } else {
+                               self.onchain_events_waiting_threshold_conf.push(entry);
                        }
                }
 
@@ -2108,11 +2123,10 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
        {
                log_trace!(logger, "Block {} at height {} disconnected", header.block_hash(), height);
 
-               if let Some(_) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) {
-                       //We may discard:
-                       //- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
-                       //- maturing spendable output has transaction paying us has been disconnected
-               }
+               //We may discard:
+               //- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
+               //- maturing spendable output has transaction paying us has been disconnected
+               self.onchain_events_waiting_threshold_conf.retain(|ref entry| entry.height != height);
 
                self.onchain_tx_handler.block_disconnected(height, broadcaster, fee_estimator, logger);
 
@@ -2344,24 +2358,21 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                                }));
                                        }
                                } else {
-                                       log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
-                                       match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                               hash_map::Entry::Occupied(mut entry) => {
-                                                       let e = entry.get_mut();
-                                                       e.retain(|ref event| {
-                                                               match **event {
-                                                                       OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                                                               return htlc_update.0 != source
-                                                                       },
-                                                                       _ => true
-                                                               }
-                                                       });
-                                                       e.push(OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)});
-                                               }
-                                               hash_map::Entry::Vacant(entry) => {
-                                                       entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)}]);
+                                       self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
+                                               if entry.height != height { return true; }
+                                               match entry.event {
+                                                        OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                                                htlc_update.0 != source
+                                                        },
+                                                        _ => true,
                                                }
-                                       }
+                                       });
+                                       let entry = OnchainEventEntry {
+                                               height,
+                                               event: OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash) },
+                                       };
+                                       log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), entry.confirmation_threshold());
+                                       self.onchain_events_waiting_threshold_conf.push(entry);
                                }
                        }
                }
@@ -2420,16 +2431,12 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                        }
                }
                if let Some(spendable_output) = spendable_output {
-                       log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), height + ANTI_REORG_DELAY - 1);
-                       match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                               hash_map::Entry::Occupied(mut entry) => {
-                                       let e = entry.get_mut();
-                                       e.push(OnchainEvent::MaturingOutput { descriptor: spendable_output });
-                               }
-                               hash_map::Entry::Vacant(entry) => {
-                                       entry.insert(vec![OnchainEvent::MaturingOutput { descriptor: spendable_output }]);
-                               }
-                       }
+                       let entry = OnchainEventEntry {
+                               height: height,
+                               event: OnchainEvent::MaturingOutput { descriptor: spendable_output.clone() },
+                       };
+                       log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), entry.confirmation_threshold());
+                       self.onchain_events_waiting_threshold_conf.push(entry);
                }
        }
 }
@@ -2695,31 +2702,26 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
                let last_block_hash: BlockHash = Readable::read(reader)?;
 
                let waiting_threshold_conf_len: u64 = Readable::read(reader)?;
-               let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
+               let mut onchain_events_waiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
                for _ in 0..waiting_threshold_conf_len {
-                       let height_target = Readable::read(reader)?;
-                       let events_len: u64 = Readable::read(reader)?;
-                       let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128));
-                       for _ in 0..events_len {
-                               let ev = match <u8 as Readable>::read(reader)? {
-                                       0 => {
-                                               let htlc_source = Readable::read(reader)?;
-                                               let hash = Readable::read(reader)?;
-                                               OnchainEvent::HTLCUpdate {
-                                                       htlc_update: (htlc_source, hash)
-                                               }
-                                       },
-                                       1 => {
-                                               let descriptor = Readable::read(reader)?;
-                                               OnchainEvent::MaturingOutput {
-                                                       descriptor
-                                               }
-                                       },
-                                       _ => return Err(DecodeError::InvalidValue),
-                               };
-                               events.push(ev);
-                       }
-                       onchain_events_waiting_threshold_conf.insert(height_target, events);
+                       let height = Readable::read(reader)?;
+                       let event = match <u8 as Readable>::read(reader)? {
+                               0 => {
+                                       let htlc_source = Readable::read(reader)?;
+                                       let hash = Readable::read(reader)?;
+                                       OnchainEvent::HTLCUpdate {
+                                               htlc_update: (htlc_source, hash)
+                                       }
+                               },
+                               1 => {
+                                       let descriptor = Readable::read(reader)?;
+                                       OnchainEvent::MaturingOutput {
+                                               descriptor
+                                       }
+                               },
+                               _ => return Err(DecodeError::InvalidValue),
+                       };
+                       onchain_events_waiting_threshold_conf.push(OnchainEventEntry { height, event });
                }
 
                let outputs_to_watch_len: u64 = Readable::read(reader)?;
index f3d8b2e9489d5078dc79dbc42be835fa54fe56f5..83dc968c6ca549cdb8a4f49978445f583664490b 100644 (file)
@@ -32,16 +32,35 @@ use util::logger::Logger;
 use util::ser::{Readable, ReadableArgs, Writer, Writeable, VecWriter};
 use util::byte_utils;
 
-use std::collections::{HashMap, hash_map};
+use std::collections::HashMap;
 use std::cmp;
 use std::ops::Deref;
 use std::mem::replace;
 
 const MAX_ALLOC_SIZE: usize = 64*1024;
 
+/// An entry for an [`OnchainEvent`], stating the block height when the event was observed.
+///
+/// Used to determine when the on-chain event can be considered safe from a chain reorganization.
+#[derive(PartialEq)]
+struct OnchainEventEntry {
+       height: u32,
+       event: OnchainEvent,
+}
+
+impl OnchainEventEntry {
+       fn confirmation_threshold(&self) -> u32 {
+               self.height + ANTI_REORG_DELAY - 1
+       }
+
+       fn has_reached_confirmation_threshold(&self, height: u32) -> bool {
+               self.confirmation_threshold() == height
+       }
+}
+
 /// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it
 /// once they mature to enough confirmations (ANTI_REORG_DELAY)
-#[derive(Clone, PartialEq)]
+#[derive(PartialEq)]
 enum OnchainEvent {
        /// Outpoint under claim process by our own tx, once this one get enough confirmations, we remove it from
        /// bump-txn candidate buffer.
@@ -280,7 +299,7 @@ pub struct OnchainTxHandler<ChannelSigner: Sign> {
        #[cfg(not(test))]
        claimable_outpoints: HashMap<BitcoinOutPoint, (Txid, u32)>,
 
-       onchain_events_waiting_threshold_conf: HashMap<u32, Vec<OnchainEvent>>,
+       onchain_events_waiting_threshold_conf: Vec<OnchainEventEntry>,
 
        latest_height: u32,
 
@@ -318,20 +337,17 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                }
 
                writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?;
-               for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() {
-                       writer.write_all(&byte_utils::be32_to_array(**target))?;
-                       writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?;
-                       for ev in events.iter() {
-                               match *ev {
-                                       OnchainEvent::Claim { ref claim_request } => {
-                                               writer.write_all(&[0; 1])?;
-                                               claim_request.write(writer)?;
-                                       },
-                                       OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => {
-                                               writer.write_all(&[1; 1])?;
-                                               outpoint.write(writer)?;
-                                               input_material.write(writer)?;
-                                       }
+               for ref entry in self.onchain_events_waiting_threshold_conf.iter() {
+                       writer.write_all(&byte_utils::be32_to_array(entry.height))?;
+                       match entry.event {
+                               OnchainEvent::Claim { ref claim_request } => {
+                                       writer.write_all(&[0; 1])?;
+                                       claim_request.write(writer)?;
+                               },
+                               OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => {
+                                       writer.write_all(&[1; 1])?;
+                                       outpoint.write(writer)?;
+                                       input_material.write(writer)?;
                                }
                        }
                }
@@ -377,32 +393,27 @@ impl<'a, K: KeysInterface> ReadableArgs<&'a K> for OnchainTxHandler<K::Signer> {
                        claimable_outpoints.insert(outpoint, (ancestor_claim_txid, height));
                }
                let waiting_threshold_conf_len: u64 = Readable::read(reader)?;
-               let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
+               let mut onchain_events_waiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
                for _ in 0..waiting_threshold_conf_len {
-                       let height_target = Readable::read(reader)?;
-                       let events_len: u64 = Readable::read(reader)?;
-                       let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128));
-                       for _ in 0..events_len {
-                               let ev = match <u8 as Readable>::read(reader)? {
-                                       0 => {
-                                               let claim_request = Readable::read(reader)?;
-                                               OnchainEvent::Claim {
-                                                       claim_request
-                                               }
-                                       },
-                                       1 => {
-                                               let outpoint = Readable::read(reader)?;
-                                               let input_material = Readable::read(reader)?;
-                                               OnchainEvent::ContentiousOutpoint {
-                                                       outpoint,
-                                                       input_material
-                                               }
+                       let height = Readable::read(reader)?;
+                       let event = match <u8 as Readable>::read(reader)? {
+                               0 => {
+                                       let claim_request = Readable::read(reader)?;
+                                       OnchainEvent::Claim {
+                                               claim_request
                                        }
-                                       _ => return Err(DecodeError::InvalidValue),
-                               };
-                               events.push(ev);
-                       }
-                       onchain_events_waiting_threshold_conf.insert(height_target, events);
+                               },
+                               1 => {
+                                       let outpoint = Readable::read(reader)?;
+                                       let input_material = Readable::read(reader)?;
+                                       OnchainEvent::ContentiousOutpoint {
+                                               outpoint,
+                                               input_material
+                                       }
+                               }
+                               _ => return Err(DecodeError::InvalidValue),
+                       };
+                       onchain_events_waiting_threshold_conf.push(OnchainEventEntry { height, event });
                }
                let latest_height = Readable::read(reader)?;
 
@@ -438,7 +449,7 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                        channel_transaction_parameters: channel_parameters,
                        pending_claim_requests: HashMap::new(),
                        claimable_outpoints: HashMap::new(),
-                       onchain_events_waiting_threshold_conf: HashMap::new(),
+                       onchain_events_waiting_threshold_conf: Vec::new(),
                        latest_height: 0,
 
                        secp_ctx,
@@ -756,16 +767,12 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
 
                                                macro_rules! clean_claim_request_after_safety_delay {
                                                        () => {
-                                                               let new_event = OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() };
-                                                               match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                                                       hash_map::Entry::Occupied(mut entry) => {
-                                                                               if !entry.get().contains(&new_event) {
-                                                                                       entry.get_mut().push(new_event);
-                                                                               }
-                                                                       },
-                                                                       hash_map::Entry::Vacant(entry) => {
-                                                                               entry.insert(vec![new_event]);
-                                                                       }
+                                                               let entry = OnchainEventEntry {
+                                                                       height,
+                                                                       event: OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() }
+                                                               };
+                                                               if !self.onchain_events_waiting_threshold_conf.contains(&entry) {
+                                                                       self.onchain_events_waiting_threshold_conf.push(entry);
                                                                }
                                                        }
                                                }
@@ -799,24 +806,22 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                                }
                        }
                        for (outpoint, input_material) in claimed_outputs_material.drain(..) {
-                               let new_event = OnchainEvent::ContentiousOutpoint { outpoint, input_material };
-                               match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                       hash_map::Entry::Occupied(mut entry) => {
-                                               if !entry.get().contains(&new_event) {
-                                                       entry.get_mut().push(new_event);
-                                               }
-                                       },
-                                       hash_map::Entry::Vacant(entry) => {
-                                               entry.insert(vec![new_event]);
-                                       }
+                               let entry = OnchainEventEntry {
+                                       height,
+                                       event: OnchainEvent::ContentiousOutpoint { outpoint, input_material },
+                               };
+                               if !self.onchain_events_waiting_threshold_conf.contains(&entry) {
+                                       self.onchain_events_waiting_threshold_conf.push(entry);
                                }
                        }
                }
 
                // After security delay, either our claim tx got enough confs or outpoint is definetely out of reach
-               if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) {
-                       for ev in events {
-                               match ev {
+               let onchain_events_waiting_threshold_conf =
+                       self.onchain_events_waiting_threshold_conf.drain(..).collect::<Vec<_>>();
+               for entry in onchain_events_waiting_threshold_conf {
+                       if entry.has_reached_confirmation_threshold(height) {
+                               match entry.event {
                                        OnchainEvent::Claim { claim_request } => {
                                                // We may remove a whole set of claim outpoints here, as these one may have
                                                // been aggregated in a single tx and claimed so atomically
@@ -830,6 +835,8 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                                                self.claimable_outpoints.remove(&outpoint);
                                        }
                                }
+                       } else {
+                               self.onchain_events_waiting_threshold_conf.push(entry);
                        }
                }
 
@@ -862,11 +869,13 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                                        L::Target: Logger,
        {
                let mut bump_candidates = HashMap::new();
-               if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) {
-                       //- our claim tx on a commitment tx output
-                       //- resurect outpoint back in its claimable set and regenerate tx
-                       for ev in events {
-                               match ev {
+               let onchain_events_waiting_threshold_conf =
+                       self.onchain_events_waiting_threshold_conf.drain(..).collect::<Vec<_>>();
+               for entry in onchain_events_waiting_threshold_conf {
+                       if entry.height == height {
+                               //- our claim tx on a commitment tx output
+                               //- resurect outpoint back in its claimable set and regenerate tx
+                               match entry.event {
                                        OnchainEvent::ContentiousOutpoint { outpoint, input_material } => {
                                                if let Some(ancestor_claimable_txid) = self.claimable_outpoints.get(&outpoint) {
                                                        if let Some(claim_material) = self.pending_claim_requests.get_mut(&ancestor_claimable_txid.0) {
@@ -879,6 +888,8 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                                        },
                                        _ => {},
                                }
+                       } else {
+                               self.onchain_events_waiting_threshold_conf.push(entry);
                        }
                }
                for (_, claim_material) in bump_candidates.iter_mut() {