use util::byte_utils;
use util::events::Event;
-use std::collections::{HashMap, HashSet, hash_map};
+use std::collections::{HashMap, HashSet};
use std::{cmp, mem};
use std::io::Error;
use std::ops::Deref;
pub(crate) witness_data: InputMaterial
}
+/// An entry for an [`OnchainEvent`], stating the block height when the event was observed.
+///
+/// Used to determine when the on-chain event can be considered safe from a chain reorganization.
+#[derive(PartialEq)]
+struct OnchainEventEntry {
+ height: u32,
+ event: OnchainEvent,
+}
+
+impl OnchainEventEntry {
+ fn confirmation_threshold(&self) -> u32 {
+ self.height + ANTI_REORG_DELAY - 1
+ }
+
+ fn has_reached_confirmation_threshold(&self, height: u32) -> bool {
+ self.confirmation_threshold() == height
+ }
+}
+
/// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it
/// once they mature to enough confirmations (ANTI_REORG_DELAY)
-#[derive(Clone, PartialEq)]
+#[derive(PartialEq)]
enum OnchainEvent {
/// HTLC output getting solved by a timeout, at maturation we pass upstream payment source information to solve
/// inbound HTLC in backward channel. Note, in case of preimage, we pass info to upstream without delay as we can
pending_monitor_events: Vec<MonitorEvent>,
pending_events: Vec<Event>,
- // Used to track onchain events, i.e transactions parts of channels confirmed on chain, on which
- // we have to take actions once they reach enough confs. Key is a block height timer, i.e we enforce
- // actions when we receive a block with given height. Actions depend on OnchainEvent type.
- onchain_events_waiting_threshold_conf: HashMap<u32, Vec<OnchainEvent>>,
+ // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
+ // which to take actions once they reach enough confirmations. Each entry includes the
+ // transaction's id and the height when the transaction was confirmed on chain.
+ onchain_events_waiting_threshold_conf: Vec<OnchainEventEntry>,
// If we get serialized out and re-read, we need to make sure that the chain monitoring
// interface knows about the TXOs that we want to be notified of spends of. We could probably
self.last_block_hash.write(writer)?;
writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?;
- for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() {
- writer.write_all(&byte_utils::be32_to_array(**target))?;
- writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?;
- for ev in events.iter() {
- match *ev {
- OnchainEvent::HTLCUpdate { ref htlc_update } => {
- 0u8.write(writer)?;
- htlc_update.0.write(writer)?;
- htlc_update.1.write(writer)?;
- },
- OnchainEvent::MaturingOutput { ref descriptor } => {
- 1u8.write(writer)?;
- descriptor.write(writer)?;
- },
- }
+ for ref entry in self.onchain_events_waiting_threshold_conf.iter() {
+ writer.write_all(&byte_utils::be32_to_array(entry.height))?;
+ match entry.event {
+ OnchainEvent::HTLCUpdate { ref htlc_update } => {
+ 0u8.write(writer)?;
+ htlc_update.0.write(writer)?;
+ htlc_update.1.write(writer)?;
+ },
+ OnchainEvent::MaturingOutput { ref descriptor } => {
+ 1u8.write(writer)?;
+ descriptor.write(writer)?;
+ },
}
}
pending_monitor_events: Vec::new(),
pending_events: Vec::new(),
- onchain_events_waiting_threshold_conf: HashMap::new(),
+ onchain_events_waiting_threshold_conf: Vec::new(),
outputs_to_watch,
onchain_tx_handler,
if let Some(ref outpoints) = self.counterparty_claimable_outpoints.get($txid) {
for &(ref htlc, ref source_option) in outpoints.iter() {
if let &Some(ref source) = source_option {
- log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
- match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
- hash_map::Entry::Occupied(mut entry) => {
- let e = entry.get_mut();
- e.retain(|ref event| {
- match **event {
- OnchainEvent::HTLCUpdate { ref htlc_update } => {
- return htlc_update.0 != **source
- },
- _ => true
- }
- });
- e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
- }
- hash_map::Entry::Vacant(entry) => {
- entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
+ self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
+ if entry.height != height { return true; }
+ match entry.event {
+ OnchainEvent::HTLCUpdate { ref htlc_update } => {
+ htlc_update.0 != **source
+ },
+ _ => true,
}
- }
+ });
+ let entry = OnchainEventEntry {
+ height,
+ event: OnchainEvent::HTLCUpdate {
+ htlc_update: ((**source).clone(), htlc.payment_hash.clone())
+ },
+ };
+ log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold());
+ self.onchain_events_waiting_threshold_conf.push(entry);
}
}
}
}
}
log_trace!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of counterparty commitment transaction", log_bytes!(htlc.payment_hash.0), $commitment_tx);
- match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
- hash_map::Entry::Occupied(mut entry) => {
- let e = entry.get_mut();
- e.retain(|ref event| {
- match **event {
- OnchainEvent::HTLCUpdate { ref htlc_update } => {
- return htlc_update.0 != **source
- },
- _ => true
- }
- });
- e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
- }
- hash_map::Entry::Vacant(entry) => {
- entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
+ self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
+ if entry.height != height { return true; }
+ match entry.event {
+ OnchainEvent::HTLCUpdate { ref htlc_update } => {
+ htlc_update.0 != **source
+ },
+ _ => true,
}
- }
+ });
+ self.onchain_events_waiting_threshold_conf.push(OnchainEventEntry {
+ height,
+ event: OnchainEvent::HTLCUpdate {
+ htlc_update: ((**source).clone(), htlc.payment_hash.clone())
+ },
+ });
}
}
}
macro_rules! wait_threshold_conf {
($height: expr, $source: expr, $commitment_tx: expr, $payment_hash: expr) => {
- log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
- match self.onchain_events_waiting_threshold_conf.entry($height + ANTI_REORG_DELAY - 1) {
- hash_map::Entry::Occupied(mut entry) => {
- let e = entry.get_mut();
- e.retain(|ref event| {
- match **event {
- OnchainEvent::HTLCUpdate { ref htlc_update } => {
- return htlc_update.0 != $source
- },
- _ => true
- }
- });
- e.push(OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)});
+ self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
+ if entry.height != $height { return true; }
+ match entry.event {
+ OnchainEvent::HTLCUpdate { ref htlc_update } => {
+ htlc_update.0 != $source
+ },
+ _ => true,
}
- hash_map::Entry::Vacant(entry) => {
- entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)}]);
- }
- }
+ });
+ let entry = OnchainEventEntry {
+ height,
+ event: OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash) },
+ };
+ log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, entry.confirmation_threshold());
+ self.onchain_events_waiting_threshold_conf.push(entry);
}
}
}
claimable_outpoints.append(&mut new_outpoints);
}
- if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) {
- for ev in events {
- match ev {
+
+ let onchain_events_waiting_threshold_conf =
+ self.onchain_events_waiting_threshold_conf.drain(..).collect::<Vec<_>>();
+ for entry in onchain_events_waiting_threshold_conf {
+ if entry.has_reached_confirmation_threshold(height) {
+ match entry.event {
OnchainEvent::HTLCUpdate { htlc_update } => {
log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
});
}
}
+ } else {
+ self.onchain_events_waiting_threshold_conf.push(entry);
}
}
{
log_trace!(logger, "Block {} at height {} disconnected", header.block_hash(), height);
- if let Some(_) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) {
- //We may discard:
- //- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
- //- maturing spendable output has transaction paying us has been disconnected
- }
+ //We may discard:
+ //- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
+ //- maturing spendable output has transaction paying us has been disconnected
+ self.onchain_events_waiting_threshold_conf.retain(|ref entry| entry.height != height);
self.onchain_tx_handler.block_disconnected(height, broadcaster, fee_estimator, logger);
}));
}
} else {
- log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
- match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
- hash_map::Entry::Occupied(mut entry) => {
- let e = entry.get_mut();
- e.retain(|ref event| {
- match **event {
- OnchainEvent::HTLCUpdate { ref htlc_update } => {
- return htlc_update.0 != source
- },
- _ => true
- }
- });
- e.push(OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)});
- }
- hash_map::Entry::Vacant(entry) => {
- entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)}]);
+ self.onchain_events_waiting_threshold_conf.retain(|ref entry| {
+ if entry.height != height { return true; }
+ match entry.event {
+ OnchainEvent::HTLCUpdate { ref htlc_update } => {
+ htlc_update.0 != source
+ },
+ _ => true,
}
- }
+ });
+ let entry = OnchainEventEntry {
+ height,
+ event: OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash) },
+ };
+ log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), entry.confirmation_threshold());
+ self.onchain_events_waiting_threshold_conf.push(entry);
}
}
}
}
}
if let Some(spendable_output) = spendable_output {
- log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), height + ANTI_REORG_DELAY - 1);
- match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
- hash_map::Entry::Occupied(mut entry) => {
- let e = entry.get_mut();
- e.push(OnchainEvent::MaturingOutput { descriptor: spendable_output });
- }
- hash_map::Entry::Vacant(entry) => {
- entry.insert(vec![OnchainEvent::MaturingOutput { descriptor: spendable_output }]);
- }
- }
+ let entry = OnchainEventEntry {
+ height: height,
+ event: OnchainEvent::MaturingOutput { descriptor: spendable_output.clone() },
+ };
+ log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), entry.confirmation_threshold());
+ self.onchain_events_waiting_threshold_conf.push(entry);
}
}
}
let last_block_hash: BlockHash = Readable::read(reader)?;
let waiting_threshold_conf_len: u64 = Readable::read(reader)?;
- let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
+ let mut onchain_events_waiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
for _ in 0..waiting_threshold_conf_len {
- let height_target = Readable::read(reader)?;
- let events_len: u64 = Readable::read(reader)?;
- let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128));
- for _ in 0..events_len {
- let ev = match <u8 as Readable>::read(reader)? {
- 0 => {
- let htlc_source = Readable::read(reader)?;
- let hash = Readable::read(reader)?;
- OnchainEvent::HTLCUpdate {
- htlc_update: (htlc_source, hash)
- }
- },
- 1 => {
- let descriptor = Readable::read(reader)?;
- OnchainEvent::MaturingOutput {
- descriptor
- }
- },
- _ => return Err(DecodeError::InvalidValue),
- };
- events.push(ev);
- }
- onchain_events_waiting_threshold_conf.insert(height_target, events);
+ let height = Readable::read(reader)?;
+ let event = match <u8 as Readable>::read(reader)? {
+ 0 => {
+ let htlc_source = Readable::read(reader)?;
+ let hash = Readable::read(reader)?;
+ OnchainEvent::HTLCUpdate {
+ htlc_update: (htlc_source, hash)
+ }
+ },
+ 1 => {
+ let descriptor = Readable::read(reader)?;
+ OnchainEvent::MaturingOutput {
+ descriptor
+ }
+ },
+ _ => return Err(DecodeError::InvalidValue),
+ };
+ onchain_events_waiting_threshold_conf.push(OnchainEventEntry { height, event });
}
let outputs_to_watch_len: u64 = Readable::read(reader)?;
use util::ser::{Readable, ReadableArgs, Writer, Writeable, VecWriter};
use util::byte_utils;
-use std::collections::{HashMap, hash_map};
+use std::collections::HashMap;
use std::cmp;
use std::ops::Deref;
use std::mem::replace;
const MAX_ALLOC_SIZE: usize = 64*1024;
+/// An entry for an [`OnchainEvent`], stating the block height when the event was observed.
+///
+/// Used to determine when the on-chain event can be considered safe from a chain reorganization.
+#[derive(PartialEq)]
+struct OnchainEventEntry {
+ height: u32,
+ event: OnchainEvent,
+}
+
+impl OnchainEventEntry {
+ fn confirmation_threshold(&self) -> u32 {
+ self.height + ANTI_REORG_DELAY - 1
+ }
+
+ fn has_reached_confirmation_threshold(&self, height: u32) -> bool {
+ self.confirmation_threshold() == height
+ }
+}
+
/// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it
/// once they mature to enough confirmations (ANTI_REORG_DELAY)
-#[derive(Clone, PartialEq)]
+#[derive(PartialEq)]
enum OnchainEvent {
/// Outpoint under claim process by our own tx, once this one get enough confirmations, we remove it from
/// bump-txn candidate buffer.
#[cfg(not(test))]
claimable_outpoints: HashMap<BitcoinOutPoint, (Txid, u32)>,
- onchain_events_waiting_threshold_conf: HashMap<u32, Vec<OnchainEvent>>,
+ onchain_events_waiting_threshold_conf: Vec<OnchainEventEntry>,
latest_height: u32,
}
writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?;
- for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() {
- writer.write_all(&byte_utils::be32_to_array(**target))?;
- writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?;
- for ev in events.iter() {
- match *ev {
- OnchainEvent::Claim { ref claim_request } => {
- writer.write_all(&[0; 1])?;
- claim_request.write(writer)?;
- },
- OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => {
- writer.write_all(&[1; 1])?;
- outpoint.write(writer)?;
- input_material.write(writer)?;
- }
+ for ref entry in self.onchain_events_waiting_threshold_conf.iter() {
+ writer.write_all(&byte_utils::be32_to_array(entry.height))?;
+ match entry.event {
+ OnchainEvent::Claim { ref claim_request } => {
+ writer.write_all(&[0; 1])?;
+ claim_request.write(writer)?;
+ },
+ OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => {
+ writer.write_all(&[1; 1])?;
+ outpoint.write(writer)?;
+ input_material.write(writer)?;
}
}
}
claimable_outpoints.insert(outpoint, (ancestor_claim_txid, height));
}
let waiting_threshold_conf_len: u64 = Readable::read(reader)?;
- let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
+ let mut onchain_events_waiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
for _ in 0..waiting_threshold_conf_len {
- let height_target = Readable::read(reader)?;
- let events_len: u64 = Readable::read(reader)?;
- let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128));
- for _ in 0..events_len {
- let ev = match <u8 as Readable>::read(reader)? {
- 0 => {
- let claim_request = Readable::read(reader)?;
- OnchainEvent::Claim {
- claim_request
- }
- },
- 1 => {
- let outpoint = Readable::read(reader)?;
- let input_material = Readable::read(reader)?;
- OnchainEvent::ContentiousOutpoint {
- outpoint,
- input_material
- }
+ let height = Readable::read(reader)?;
+ let event = match <u8 as Readable>::read(reader)? {
+ 0 => {
+ let claim_request = Readable::read(reader)?;
+ OnchainEvent::Claim {
+ claim_request
}
- _ => return Err(DecodeError::InvalidValue),
- };
- events.push(ev);
- }
- onchain_events_waiting_threshold_conf.insert(height_target, events);
+ },
+ 1 => {
+ let outpoint = Readable::read(reader)?;
+ let input_material = Readable::read(reader)?;
+ OnchainEvent::ContentiousOutpoint {
+ outpoint,
+ input_material
+ }
+ }
+ _ => return Err(DecodeError::InvalidValue),
+ };
+ onchain_events_waiting_threshold_conf.push(OnchainEventEntry { height, event });
}
let latest_height = Readable::read(reader)?;
channel_transaction_parameters: channel_parameters,
pending_claim_requests: HashMap::new(),
claimable_outpoints: HashMap::new(),
- onchain_events_waiting_threshold_conf: HashMap::new(),
+ onchain_events_waiting_threshold_conf: Vec::new(),
latest_height: 0,
secp_ctx,
macro_rules! clean_claim_request_after_safety_delay {
() => {
- let new_event = OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() };
- match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
- hash_map::Entry::Occupied(mut entry) => {
- if !entry.get().contains(&new_event) {
- entry.get_mut().push(new_event);
- }
- },
- hash_map::Entry::Vacant(entry) => {
- entry.insert(vec![new_event]);
- }
+ let entry = OnchainEventEntry {
+ height,
+ event: OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() }
+ };
+ if !self.onchain_events_waiting_threshold_conf.contains(&entry) {
+ self.onchain_events_waiting_threshold_conf.push(entry);
}
}
}
}
}
for (outpoint, input_material) in claimed_outputs_material.drain(..) {
- let new_event = OnchainEvent::ContentiousOutpoint { outpoint, input_material };
- match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
- hash_map::Entry::Occupied(mut entry) => {
- if !entry.get().contains(&new_event) {
- entry.get_mut().push(new_event);
- }
- },
- hash_map::Entry::Vacant(entry) => {
- entry.insert(vec![new_event]);
- }
+ let entry = OnchainEventEntry {
+ height,
+ event: OnchainEvent::ContentiousOutpoint { outpoint, input_material },
+ };
+ if !self.onchain_events_waiting_threshold_conf.contains(&entry) {
+ self.onchain_events_waiting_threshold_conf.push(entry);
}
}
}
// After security delay, either our claim tx got enough confs or outpoint is definetely out of reach
- if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) {
- for ev in events {
- match ev {
+ let onchain_events_waiting_threshold_conf =
+ self.onchain_events_waiting_threshold_conf.drain(..).collect::<Vec<_>>();
+ for entry in onchain_events_waiting_threshold_conf {
+ if entry.has_reached_confirmation_threshold(height) {
+ match entry.event {
OnchainEvent::Claim { claim_request } => {
// We may remove a whole set of claim outpoints here, as these one may have
// been aggregated in a single tx and claimed so atomically
self.claimable_outpoints.remove(&outpoint);
}
}
+ } else {
+ self.onchain_events_waiting_threshold_conf.push(entry);
}
}
L::Target: Logger,
{
let mut bump_candidates = HashMap::new();
- if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) {
- //- our claim tx on a commitment tx output
- //- resurect outpoint back in its claimable set and regenerate tx
- for ev in events {
- match ev {
+ let onchain_events_waiting_threshold_conf =
+ self.onchain_events_waiting_threshold_conf.drain(..).collect::<Vec<_>>();
+ for entry in onchain_events_waiting_threshold_conf {
+ if entry.height == height {
+ //- our claim tx on a commitment tx output
+ //- resurect outpoint back in its claimable set and regenerate tx
+ match entry.event {
OnchainEvent::ContentiousOutpoint { outpoint, input_material } => {
if let Some(ancestor_claimable_txid) = self.claimable_outpoints.get(&outpoint) {
if let Some(claim_material) = self.pending_claim_requests.get_mut(&ancestor_claimable_txid.0) {
},
_ => {},
}
+ } else {
+ self.onchain_events_waiting_threshold_conf.push(entry);
}
}
for (_, claim_material) in bump_candidates.iter_mut() {