X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmonitor.rs;h=1bc8c76b49e82765f73e550c41e17c3243a027fc;hb=39b62335b737e0ad40fc76aefb3d5d24ef64497a;hp=bbcbe75c67bf43bc78c0069e734c11ecff3ee447;hpb=ab7a0a54318cdd55bedc3a02604af200b16e2e2c;p=rust-lightning diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index bbcbe75c..1bc8c76b 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -37,7 +37,7 @@ use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInter use chain::transaction::OutPoint; use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys}; use util::logger::Logger; -use util::ser::{ReadableArgs, Readable, Writer, Writeable, U48}; +use util::ser::{ReadableArgs, Readable, MaybeReadable, Writer, Writeable, U48}; use util::{byte_utils, events}; use std::collections::{HashMap, hash_map, HashSet}; @@ -212,33 +212,31 @@ pub trait ManyChannelMonitor: Send + Sync { /// /// If you're using this for local monitoring of your own channels, you probably want to use /// `OutPoint` as the key, which will give you a ManyChannelMonitor implementation. -pub struct SimpleManyChannelMonitor where T::Target: BroadcasterInterface { +pub struct SimpleManyChannelMonitor + where T::Target: BroadcasterInterface, + F::Target: FeeEstimator +{ #[cfg(test)] // Used in ChannelManager tests to manipulate channels directly pub monitors: Mutex>>, #[cfg(not(test))] monitors: Mutex>>, chain_monitor: Arc, broadcaster: T, - pending_events: Mutex>, logger: Arc, - fee_estimator: Arc + fee_estimator: F } -impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync + Send> ChainListener for SimpleManyChannelMonitor - where T::Target: BroadcasterInterface +impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send> + ChainListener for SimpleManyChannelMonitor + where T::Target: BroadcasterInterface, + F::Target: FeeEstimator { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) { let block_hash = header.bitcoin_hash(); - let mut new_events: Vec = Vec::with_capacity(0); { let mut monitors = self.monitors.lock().unwrap(); for monitor in monitors.values_mut() { - let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator); - if spendable_outputs.len() > 0 { - new_events.push(events::Event::SpendableOutputs { - outputs: spendable_outputs, - }); - } + let txn_outputs = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator); for (ref txid, ref outputs) in txn_outputs { for (idx, output) in outputs.iter().enumerate() { @@ -247,8 +245,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + } } } - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.append(&mut new_events); } fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) { @@ -260,17 +256,17 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + } } -impl SimpleManyChannelMonitor - where T::Target: BroadcasterInterface +impl SimpleManyChannelMonitor + where T::Target: BroadcasterInterface, + F::Target: FeeEstimator { /// Creates a new object which can be used to monitor several channels given the chain /// interface with which to register to receive notifications. - pub fn new(chain_monitor: Arc, broadcaster: T, logger: Arc, feeest: Arc) -> SimpleManyChannelMonitor { + pub fn new(chain_monitor: Arc, broadcaster: T, logger: Arc, feeest: F) -> SimpleManyChannelMonitor { let res = SimpleManyChannelMonitor { monitors: Mutex::new(HashMap::new()), chain_monitor, broadcaster, - pending_events: Mutex::new(Vec::new()), logger, fee_estimator: feeest, }; @@ -324,8 +320,9 @@ impl ManyChannelMonitor for SimpleManyChannelMonitor - where T::Target: BroadcasterInterface +impl ManyChannelMonitor for SimpleManyChannelMonitor + where T::Target: BroadcasterInterface, + F::Target: FeeEstimator { fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { match self.add_monitor_by_key(funding_txo, monitor) { @@ -350,14 +347,16 @@ impl ManyChannelMonitor events::EventsProvider for SimpleManyChannelMonitor - where T::Target: BroadcasterInterface +impl events::EventsProvider for SimpleManyChannelMonitor + where T::Target: BroadcasterInterface, + F::Target: FeeEstimator { fn get_and_clear_pending_events(&self) -> Vec { - let mut pending_events = self.pending_events.lock().unwrap(); - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut *pending_events); - ret + let mut pending_events = Vec::new(); + for chan in self.monitors.lock().unwrap().values_mut() { + pending_events.append(&mut chan.get_and_clear_pending_events()); + } + pending_events } } @@ -784,6 +783,11 @@ impl Readable for ChannelMonitorUpdateStep { /// /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date /// information and are actively monitoring the chain. +/// +/// Pending Events or updated HTLCs which have not yet been read out by +/// get_and_clear_pending_htlcs_updated or get_and_clear_pending_events are serialized to disk and +/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events +/// gotten are fully handled before re-serializing the new state. pub struct ChannelMonitor { latest_update_id: u64, commitment_transaction_number_obscure_factor: u64, @@ -827,6 +831,7 @@ pub struct ChannelMonitor { payment_preimages: HashMap, pending_htlcs_updated: Vec, + pending_events: Vec, destination_script: Script, // Thanks to data loss protection, we may be able to claim our non-htlc funds @@ -940,6 +945,7 @@ impl PartialEq for ChannelMonitor { self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx || self.payment_preimages != other.payment_preimages || self.pending_htlcs_updated != other.pending_htlcs_updated || + self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly self.destination_script != other.destination_script || self.to_remote_rescue != other.to_remote_rescue || self.pending_claim_requests != other.pending_claim_requests || @@ -1127,6 +1133,11 @@ impl ChannelMonitor { data.write(writer)?; } + writer.write_all(&byte_utils::be64_to_array(self.pending_events.len() as u64))?; + for event in self.pending_events.iter() { + event.write(writer)?; + } + self.last_block_hash.write(writer)?; self.destination_script.write(writer)?; if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue { @@ -1259,6 +1270,7 @@ impl ChannelMonitor { payment_preimages: HashMap::new(), pending_htlcs_updated: Vec::new(), + pending_events: Vec::new(), destination_script: destination_script.clone(), to_remote_rescue: None, @@ -1325,7 +1337,6 @@ impl ChannelMonitor { // Prune HTLCs from the previous remote commitment tx so we don't generate failure/fulfill // events for now-revoked/fulfilled HTLCs. - // TODO: We should probably consider whether we're really getting the next secret here. if let Storage::Local { ref mut prev_remote_commitment_txid, .. } = self.key_storage { if let Some(txid) = prev_remote_commitment_txid.take() { for &mut (_, ref mut source) in self.remote_claimable_outpoints.get_mut(&txid).unwrap() { @@ -1553,6 +1564,18 @@ impl ChannelMonitor { ret } + /// Gets the list of pending events which were generated by previous actions, clearing the list + /// in the process. + /// + /// This is called by ManyChannelMonitor::get_and_clear_pending_events() and is equivalent to + /// EventsProvider::get_and_clear_pending_events() except that it requires &mut self as we do + /// no internal locking in ChannelMonitors. + pub fn get_and_clear_pending_events(&mut self) -> Vec { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_events); + ret + } + /// Can only fail if idx is < get_min_seen_secret pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> { self.commitment_secrets.get_secret(idx) @@ -1578,7 +1601,9 @@ impl ChannelMonitor { /// HTLC-Success/HTLC-Timeout transactions. /// Return updates for HTLC pending in the channel and failed automatically by the broadcast of /// revoked remote commitment tx - fn check_spend_remote_transaction(&mut self, tx: &Transaction, height: u32, fee_estimator: &FeeEstimator) -> (Vec, (Sha256dHash, Vec), Vec) { + fn check_spend_remote_transaction(&mut self, tx: &Transaction, height: u32, fee_estimator: F) -> (Vec, (Sha256dHash, Vec), Vec) + where F::Target: FeeEstimator + { // Most secp and related errors trying to create keys means we have no hope of constructing // a spend transaction...so we return no transactions to broadcast let mut txn_to_broadcast = Vec::new(); @@ -2153,7 +2178,9 @@ impl ChannelMonitor { } /// Attempts to claim a remote HTLC-Success/HTLC-Timeout's outputs using the revocation key - fn check_spend_remote_htlc(&mut self, tx: &Transaction, commitment_number: u64, height: u32, fee_estimator: &FeeEstimator) -> (Option, Option) { + fn check_spend_remote_htlc(&mut self, tx: &Transaction, commitment_number: u64, height: u32, fee_estimator: F) -> (Option, Option) + where F::Target: FeeEstimator + { //TODO: send back new outputs to guarantee pending_claim_request consistency if tx.input.len() != 1 || tx.output.len() != 1 { return (None, None) @@ -2523,8 +2550,9 @@ impl ChannelMonitor { /// Eventually this should be pub and, roughly, implement ChainListener, however this requires /// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of /// on-chain. - fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec)>, Vec) - where B::Target: BroadcasterInterface + fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F)-> Vec<(Sha256dHash, Vec)> + where B::Target: BroadcasterInterface, + F::Target: FeeEstimator { for tx in txn_matched { let mut output_val = 0; @@ -2557,7 +2585,7 @@ impl ChannelMonitor { }; if funding_txo.is_none() || (prevout.txid == funding_txo.as_ref().unwrap().0.txid && prevout.vout == funding_txo.as_ref().unwrap().0.index as u32) { if (tx.input[0].sequence >> 8*3) as u8 == 0x80 && (tx.lock_time >> 8*3) as u8 == 0x20 { - let (remote_txn, new_outputs, mut spendable_output) = self.check_spend_remote_transaction(&tx, height, fee_estimator); + let (remote_txn, new_outputs, mut spendable_output) = self.check_spend_remote_transaction(&tx, height, &*fee_estimator); txn = remote_txn; spendable_outputs.append(&mut spendable_output); if !new_outputs.1.is_empty() { @@ -2579,7 +2607,7 @@ impl ChannelMonitor { } } else { if let Some(&(commitment_number, _)) = self.remote_commitment_txn_on_chain.get(&prevout.txid) { - let (tx, spendable_output) = self.check_spend_remote_htlc(&tx, commitment_number, height, fee_estimator); + let (tx, spendable_output) = self.check_spend_remote_htlc(&tx, commitment_number, height, &*fee_estimator); if let Some(tx) = tx { txn.push(tx); } @@ -2739,7 +2767,7 @@ impl ChannelMonitor { for first_claim_txid in bump_candidates.iter() { if let Some((new_timer, new_feerate)) = { if let Some(claim_material) = self.pending_claim_requests.get(first_claim_txid) { - if let Some((new_timer, new_feerate, bump_tx)) = self.bump_claim_tx(height, &claim_material, fee_estimator) { + if let Some((new_timer, new_feerate, bump_tx)) = self.bump_claim_tx(height, &claim_material, &*fee_estimator) { broadcaster.broadcast_transaction(&bump_tx); Some((new_timer, new_feerate)) } else { None } @@ -2755,11 +2783,19 @@ impl ChannelMonitor { for &(ref txid, ref output_scripts) in watch_outputs.iter() { self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect()); } - (watch_outputs, spendable_outputs) + + if spendable_outputs.len() > 0 { + self.pending_events.push(events::Event::SpendableOutputs { + outputs: spendable_outputs, + }); + } + + watch_outputs } - fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: &FeeEstimator) - where B::Target: BroadcasterInterface + fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: B, fee_estimator: F) + where B::Target: BroadcasterInterface, + F::Target: FeeEstimator { log_trace!(self, "Block {} at height {} disconnected", block_hash, height); let mut bump_candidates = HashMap::new(); @@ -2785,7 +2821,7 @@ impl ChannelMonitor { } } for (_, claim_material) in bump_candidates.iter_mut() { - if let Some((new_timer, new_feerate, bump_tx)) = self.bump_claim_tx(height, &claim_material, fee_estimator) { + if let Some((new_timer, new_feerate, bump_tx)) = self.bump_claim_tx(height, &claim_material, &*fee_estimator) { claim_material.height_timer = new_timer; claim_material.feerate_previous = new_feerate; broadcaster.broadcast_transaction(&bump_tx); @@ -3015,7 +3051,9 @@ impl ChannelMonitor { /// Lightning security model (i.e being able to redeem/timeout HTLC or penalize coutnerparty onchain) lays on the assumption of claim transactions getting confirmed before timelock expiration /// (CSV or CLTV following cases). In case of high-fee spikes, claim tx may stuck in the mempool, so you need to bump its feerate quickly using Replace-By-Fee or Child-Pay-For-Parent. - fn bump_claim_tx(&self, height: u32, cached_claim_datas: &ClaimTxBumpMaterial, fee_estimator: &FeeEstimator) -> Option<(u32, u64, Transaction)> { + fn bump_claim_tx(&self, height: u32, cached_claim_datas: &ClaimTxBumpMaterial, fee_estimator: F) -> Option<(u32, u64, Transaction)> + where F::Target: FeeEstimator + { if cached_claim_datas.per_input_material.len() == 0 { return None } // But don't prune pending claiming request yet, we may have to resurrect HTLCs let mut inputs = Vec::new(); for outp in cached_claim_datas.per_input_material.keys() { @@ -3354,6 +3392,14 @@ impl> ReadableArgs())); + for _ in 0..pending_events_len { + if let Some(event) = MaybeReadable::read(reader)? { + pending_events.push(event); + } + } + let last_block_hash: Sha256dHash = Readable::read(reader)?; let destination_script = Readable::read(reader)?; let to_remote_rescue = match >::read(reader)? { @@ -3456,6 +3502,7 @@ impl> ReadableArgs