X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmonitor.rs;h=fd724c1cacb952a9b38f5b68c37d3ddf945e0077;hb=bd39b20f642e042981e4fdd5f3600a357be51931;hp=9fb2402791a91a8d7dae77e1de2cac5ccb5b7ec4;hpb=1ab0a1acc1e3880b9cb9aece4cd4274bef9c7973;p=rust-lightning diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index 9fb24027..fd724c1c 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -40,7 +40,8 @@ use ln::chan_utils; use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HolderCommitmentTransaction, HTLCType}; use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; -use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface, FeeEstimator}; +use chain; +use chain::chaininterface::{ChainListener, ChainWatchInterface, ChainWatchedUtil, BroadcasterInterface, FeeEstimator}; use chain::transaction::OutPoint; use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys}; use util::logger::Logger; @@ -48,7 +49,7 @@ use util::ser::{Readable, MaybeReadable, Writer, Writeable, U48}; use util::{byte_utils, events}; use util::events::Event; -use std::collections::{HashMap, hash_map}; +use std::collections::{HashMap, HashSet, hash_map}; use std::sync::Mutex; use std::{hash,cmp, mem}; use std::ops::Deref; @@ -198,12 +199,74 @@ pub struct SimpleManyChannelMonitor>>, + watch_events: Mutex, chain_monitor: C, broadcaster: T, logger: L, fee_estimator: F } +struct WatchEventQueue { + watched: ChainWatchedUtil, + events: Vec, +} + +impl WatchEventQueue { + fn new() -> Self { + Self { + watched: ChainWatchedUtil::new(), + events: Vec::new(), + } + } + + fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) { + if self.watched.register_tx(txid, script_pubkey) { + self.events.push(chain::WatchEvent::WatchTransaction { + txid: *txid, + script_pubkey: script_pubkey.clone() + }); + } + } + + fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) { + let (txid, index) = outpoint; + if self.watched.register_outpoint((*txid, index as u32), script_pubkey) { + self.events.push(chain::WatchEvent::WatchOutput { + outpoint: OutPoint { + txid: *txid, + index: index as u16, + }, + script_pubkey: script_pubkey.clone(), + }); + } + } + + fn dequeue_events(&mut self) -> Vec { + let mut pending_events = Vec::with_capacity(self.events.len()); + pending_events.append(&mut self.events); + pending_events + } + + fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<(usize, &'a Transaction)> { + let mut matched_txids = HashSet::new(); + txdata.iter().filter(|&&(_, tx)| { + // A tx matches the filter if it either matches the filter directly (via does_match_tx) + // or if it is a descendant of another matched transaction within the same block. + let mut matched = self.watched.does_match_tx(tx); + for input in tx.input.iter() { + if matched || matched_txids.contains(&input.previous_output.txid) { + matched = true; + break; + } + } + if matched { + matched_txids.insert(tx.txid()); + } + matched + }).map(|e| *e).collect() + } +} + impl ChainListener for SimpleManyChannelMonitor where T::Target: BroadcasterInterface, @@ -212,24 +275,19 @@ impl = matched_indexes.iter().map(|index| txdata[*index]).collect(); - let last_seen = self.chain_monitor.reentered(); - { - let mut monitors = self.monitors.lock().unwrap(); - for monitor in monitors.values_mut() { - let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); - - for (ref txid, ref outputs) in txn_outputs { - for (idx, output) in outputs.iter().enumerate() { - self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey); - } + let mut watch_events = self.watch_events.lock().unwrap(); + let matched_txn = watch_events.filter_block(txdata); + { + let mut monitors = self.monitors.lock().unwrap(); + for monitor in monitors.values_mut() { + let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + + for (ref txid, ref outputs) in txn_outputs { + for (idx, output) in outputs.iter().enumerate() { + watch_events.watch_output((txid, idx), &output.script_pubkey); } } } - reentered = last_seen != self.chain_monitor.reentered(); } } @@ -252,6 +310,7 @@ impl SimpleManyChannelMonitor { let res = SimpleManyChannelMonitor { monitors: Mutex::new(HashMap::new()), + watch_events: Mutex::new(WatchEventQueue::new()), chain_monitor, broadcaster, logger, @@ -263,6 +322,7 @@ impl) -> Result<(), MonitorUpdateError> { + let mut watch_events = self.watch_events.lock().unwrap(); let mut monitors = self.monitors.lock().unwrap(); let entry = match monitors.entry(key) { hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given key is already present")), @@ -271,11 +331,11 @@ impl chain::WatchEventProvider for SimpleManyChannelMonitor + where T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + C::Target: ChainWatchInterface, +{ + fn release_pending_watch_events(&self) -> Vec { + self.watch_events.lock().unwrap().dequeue_events() + } +} + /// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction, /// instead claiming it in its own individual transaction. pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12; @@ -881,30 +952,14 @@ pub trait ManyChannelMonitor: Send + Sync { /// Adds a monitor for the given `funding_txo`. /// - /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with - /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected - /// callbacks with the funding transaction, or any spends of it. - /// - /// Further, the implementer must also ensure that each output returned in - /// monitor.get_outputs_to_watch() is registered to ensure that the provided monitor learns about - /// any spends of any of the outputs. - /// - /// Any spends of outputs which should have been registered which aren't passed to - /// ChannelMonitors via block_connected may result in FUNDS LOSS. + /// Implementations must ensure that `monitor` receives block_connected calls for blocks with + /// the funding transaction or any spends of it, as well as any spends of outputs returned by + /// get_outputs_to_watch. Not doing so may result in LOST FUNDS. fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; /// Updates a monitor for the given `funding_txo`. /// - /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with - /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected - /// callbacks with the funding transaction, or any spends of it. - /// - /// Further, the implementer must also ensure that each output returned in - /// monitor.get_watch_outputs() is registered to ensure that the provided monitor learns about - /// any spends of any of the outputs. - /// - /// Any spends of outputs which should have been registered which aren't passed to - /// ChannelMonitors via block_connected may result in FUNDS LOSS. + /// TODO(jkczyz): Determine where this should go from e73036c6845fd3cc16479a1b497db82a5ebb3897. /// /// In case of distributed watchtowers deployment, even if an Err is return, the new version /// must be written to disk, as state may have been stored but rejected due to a block forcing