X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fonchaintx.rs;h=053502495268bb7c184a01ccd309a98ee89ccc10;hb=fdc11f2c76617bc05d8b2c052186f233a34dc709;hp=f3d8b2e9489d5078dc79dbc42be835fa54fe56f5;hpb=4894d52d30399c21b7994952a8de0d1d7848c58d;p=rust-lightning diff --git a/lightning/src/ln/onchaintx.rs b/lightning/src/ln/onchaintx.rs index f3d8b2e9..05350249 100644 --- a/lightning/src/ln/onchaintx.rs +++ b/lightning/src/ln/onchaintx.rs @@ -22,7 +22,7 @@ use bitcoin::secp256k1::{Secp256k1, Signature}; use bitcoin::secp256k1; use ln::msgs::DecodeError; -use ln::channelmanager::PaymentPreimage; +use ln::PaymentPreimage; use ln::chan_utils; use ln::chan_utils::{TxCreationKeys, ChannelTransactionParameters, HolderCommitmentTransaction}; use chain::chaininterface::{FeeEstimator, BroadcasterInterface, ConfirmationTarget, MIN_RELAY_FEE_SAT_PER_1000_WEIGHT}; @@ -32,16 +32,37 @@ use util::logger::Logger; use util::ser::{Readable, ReadableArgs, Writer, Writeable, VecWriter}; use util::byte_utils; -use std::collections::{HashMap, hash_map}; +use std::collections::HashMap; use std::cmp; use std::ops::Deref; use std::mem::replace; const MAX_ALLOC_SIZE: usize = 64*1024; +/// An entry for an [`OnchainEvent`], stating the block height when the event was observed and the +/// transaction causing it. +/// +/// Used to determine when the on-chain event can be considered safe from a chain reorganization. +#[derive(PartialEq)] +struct OnchainEventEntry { + txid: Txid, + height: u32, + event: OnchainEvent, +} + +impl OnchainEventEntry { + fn confirmation_threshold(&self) -> u32 { + self.height + ANTI_REORG_DELAY - 1 + } + + fn has_reached_confirmation_threshold(&self, height: u32) -> bool { + height >= self.confirmation_threshold() + } +} + /// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it /// once they mature to enough confirmations (ANTI_REORG_DELAY) -#[derive(Clone, PartialEq)] +#[derive(PartialEq)] enum OnchainEvent { /// Outpoint under claim process by our own tx, once this one get enough confirmations, we remove it from /// bump-txn candidate buffer. @@ -280,7 +301,7 @@ pub struct OnchainTxHandler { #[cfg(not(test))] claimable_outpoints: HashMap, - onchain_events_waiting_threshold_conf: HashMap>, + onchain_events_awaiting_threshold_conf: Vec, latest_height: u32, @@ -317,21 +338,19 @@ impl OnchainTxHandler { claim_and_height.1.write(writer)?; } - writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?; - for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() { - writer.write_all(&byte_utils::be32_to_array(**target))?; - writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?; - for ev in events.iter() { - match *ev { - OnchainEvent::Claim { ref claim_request } => { - writer.write_all(&[0; 1])?; - claim_request.write(writer)?; - }, - OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => { - writer.write_all(&[1; 1])?; - outpoint.write(writer)?; - input_material.write(writer)?; - } + writer.write_all(&byte_utils::be64_to_array(self.onchain_events_awaiting_threshold_conf.len() as u64))?; + for ref entry in self.onchain_events_awaiting_threshold_conf.iter() { + entry.txid.write(writer)?; + writer.write_all(&byte_utils::be32_to_array(entry.height))?; + match entry.event { + OnchainEvent::Claim { ref claim_request } => { + writer.write_all(&[0; 1])?; + claim_request.write(writer)?; + }, + OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => { + writer.write_all(&[1; 1])?; + outpoint.write(writer)?; + input_material.write(writer)?; } } } @@ -377,32 +396,28 @@ impl<'a, K: KeysInterface> ReadableArgs<&'a K> for OnchainTxHandler { claimable_outpoints.insert(outpoint, (ancestor_claim_txid, height)); } let waiting_threshold_conf_len: u64 = Readable::read(reader)?; - let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128)); + let mut onchain_events_awaiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128)); for _ in 0..waiting_threshold_conf_len { - let height_target = Readable::read(reader)?; - let events_len: u64 = Readable::read(reader)?; - let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128)); - for _ in 0..events_len { - let ev = match ::read(reader)? { - 0 => { - let claim_request = Readable::read(reader)?; - OnchainEvent::Claim { - claim_request - } - }, - 1 => { - let outpoint = Readable::read(reader)?; - let input_material = Readable::read(reader)?; - OnchainEvent::ContentiousOutpoint { - outpoint, - input_material - } + let txid = Readable::read(reader)?; + let height = Readable::read(reader)?; + let event = match ::read(reader)? { + 0 => { + let claim_request = Readable::read(reader)?; + OnchainEvent::Claim { + claim_request } - _ => return Err(DecodeError::InvalidValue), - }; - events.push(ev); - } - onchain_events_waiting_threshold_conf.insert(height_target, events); + }, + 1 => { + let outpoint = Readable::read(reader)?; + let input_material = Readable::read(reader)?; + OnchainEvent::ContentiousOutpoint { + outpoint, + input_material + } + } + _ => return Err(DecodeError::InvalidValue), + }; + onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid, height, event }); } let latest_height = Readable::read(reader)?; @@ -419,7 +434,7 @@ impl<'a, K: KeysInterface> ReadableArgs<&'a K> for OnchainTxHandler { channel_transaction_parameters: channel_parameters, claimable_outpoints, pending_claim_requests, - onchain_events_waiting_threshold_conf, + onchain_events_awaiting_threshold_conf, latest_height, secp_ctx, }) @@ -438,7 +453,7 @@ impl OnchainTxHandler { channel_transaction_parameters: channel_parameters, pending_claim_requests: HashMap::new(), claimable_outpoints: HashMap::new(), - onchain_events_waiting_threshold_conf: HashMap::new(), + onchain_events_awaiting_threshold_conf: Vec::new(), latest_height: 0, secp_ctx, @@ -727,7 +742,7 @@ impl OnchainTxHandler { self.claimable_outpoints.insert(k.clone(), (txid, height)); } self.pending_claim_requests.insert(txid, claim_material); - log_trace!(logger, "Broadcast onchain {}", log_tx!(tx)); + log_info!(logger, "Broadcasting onchain {}", log_tx!(tx)); broadcaster.broadcast_transaction(&tx); } } @@ -756,16 +771,13 @@ impl OnchainTxHandler { macro_rules! clean_claim_request_after_safety_delay { () => { - let new_event = OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() }; - match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) { - hash_map::Entry::Occupied(mut entry) => { - if !entry.get().contains(&new_event) { - entry.get_mut().push(new_event); - } - }, - hash_map::Entry::Vacant(entry) => { - entry.insert(vec![new_event]); - } + let entry = OnchainEventEntry { + txid: tx.txid(), + height, + event: OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() } + }; + if !self.onchain_events_awaiting_threshold_conf.contains(&entry) { + self.onchain_events_awaiting_threshold_conf.push(entry); } } } @@ -799,24 +811,23 @@ impl OnchainTxHandler { } } for (outpoint, input_material) in claimed_outputs_material.drain(..) { - let new_event = OnchainEvent::ContentiousOutpoint { outpoint, input_material }; - match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) { - hash_map::Entry::Occupied(mut entry) => { - if !entry.get().contains(&new_event) { - entry.get_mut().push(new_event); - } - }, - hash_map::Entry::Vacant(entry) => { - entry.insert(vec![new_event]); - } + let entry = OnchainEventEntry { + txid: tx.txid(), + height, + event: OnchainEvent::ContentiousOutpoint { outpoint, input_material }, + }; + if !self.onchain_events_awaiting_threshold_conf.contains(&entry) { + self.onchain_events_awaiting_threshold_conf.push(entry); } } } // After security delay, either our claim tx got enough confs or outpoint is definetely out of reach - if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) { - for ev in events { - match ev { + let onchain_events_awaiting_threshold_conf = + self.onchain_events_awaiting_threshold_conf.drain(..).collect::>(); + for entry in onchain_events_awaiting_threshold_conf { + if entry.has_reached_confirmation_threshold(height) { + match entry.event { OnchainEvent::Claim { claim_request } => { // We may remove a whole set of claim outpoints here, as these one may have // been aggregated in a single tx and claimed so atomically @@ -830,13 +841,15 @@ impl OnchainTxHandler { self.claimable_outpoints.remove(&outpoint); } } + } else { + self.onchain_events_awaiting_threshold_conf.push(entry); } } // Check if any pending claim request must be rescheduled for (first_claim_txid, ref claim_data) in self.pending_claim_requests.iter() { - if let Some(h) = claim_data.height_timer { - if h == height { + if let Some(height_timer) = claim_data.height_timer { + if height >= height_timer { bump_candidates.insert(*first_claim_txid, (*claim_data).clone()); } } @@ -846,7 +859,7 @@ impl OnchainTxHandler { log_trace!(logger, "Bumping {} candidates", bump_candidates.len()); for (first_claim_txid, claim_material) in bump_candidates.iter() { if let Some((new_timer, new_feerate, bump_tx)) = self.generate_claim_tx(height, &claim_material, &*fee_estimator, &*logger) { - log_trace!(logger, "Broadcast onchain {}", log_tx!(bump_tx)); + log_info!(logger, "Broadcasting onchain {}", log_tx!(bump_tx)); broadcaster.broadcast_transaction(&bump_tx); if let Some(claim_material) = self.pending_claim_requests.get_mut(first_claim_txid) { claim_material.height_timer = new_timer; @@ -856,17 +869,43 @@ impl OnchainTxHandler { } } + pub(crate) fn transaction_unconfirmed( + &mut self, + txid: &Txid, + broadcaster: B, + fee_estimator: F, + logger: L, + ) where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + let mut height = None; + for entry in self.onchain_events_awaiting_threshold_conf.iter() { + if entry.txid == *txid { + height = Some(entry.height); + break; + } + } + + if let Some(height) = height { + self.block_disconnected(height, broadcaster, fee_estimator, logger); + } + } + pub(crate) fn block_disconnected(&mut self, height: u32, broadcaster: B, fee_estimator: F, logger: L) where B::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, { let mut bump_candidates = HashMap::new(); - if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) { - //- our claim tx on a commitment tx output - //- resurect outpoint back in its claimable set and regenerate tx - for ev in events { - match ev { + let onchain_events_awaiting_threshold_conf = + self.onchain_events_awaiting_threshold_conf.drain(..).collect::>(); + for entry in onchain_events_awaiting_threshold_conf { + if entry.height >= height { + //- our claim tx on a commitment tx output + //- resurect outpoint back in its claimable set and regenerate tx + match entry.event { OnchainEvent::ContentiousOutpoint { outpoint, input_material } => { if let Some(ancestor_claimable_txid) = self.claimable_outpoints.get(&outpoint) { if let Some(claim_material) = self.pending_claim_requests.get_mut(&ancestor_claimable_txid.0) { @@ -879,12 +918,15 @@ impl OnchainTxHandler { }, _ => {}, } + } else { + self.onchain_events_awaiting_threshold_conf.push(entry); } } for (_, claim_material) in bump_candidates.iter_mut() { if let Some((new_timer, new_feerate, bump_tx)) = self.generate_claim_tx(height, &claim_material, &&*fee_estimator, &&*logger) { claim_material.height_timer = new_timer; claim_material.feerate_previous = new_feerate; + log_info!(logger, "Broadcasting onchain {}", log_tx!(bump_tx)); broadcaster.broadcast_transaction(&bump_tx); } } @@ -895,7 +937,7 @@ impl OnchainTxHandler { // right now if one of the outpoint get disconnected, just erase whole pending claim request. let mut remove_request = Vec::new(); self.claimable_outpoints.retain(|_, ref v| - if v.1 == height { + if v.1 >= height { remove_request.push(v.0.clone()); false } else { true }); @@ -904,6 +946,16 @@ impl OnchainTxHandler { } } + pub(crate) fn get_relevant_txids(&self) -> Vec { + let mut txids: Vec = self.onchain_events_awaiting_threshold_conf + .iter() + .map(|entry| entry.txid) + .collect(); + txids.sort_unstable(); + txids.dedup(); + txids + } + pub(crate) fn provide_latest_holder_tx(&mut self, tx: HolderCommitmentTransaction) { self.prev_holder_commitment = Some(replace(&mut self.holder_commitment, tx)); self.holder_htlc_sigs = None;