use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HolderCommitmentTransaction, HTLCType};
use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
-use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface, FeeEstimator};
+use chain;
+use chain::chaininterface::{ChainListener, ChainWatchInterface, ChainWatchedUtil, BroadcasterInterface, FeeEstimator};
use chain::transaction::OutPoint;
use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
use util::logger::Logger;
use util::{byte_utils, events};
use util::events::Event;
-use std::collections::{HashMap, hash_map};
+use std::collections::{HashMap, HashSet, hash_map};
use std::sync::Mutex;
use std::{hash,cmp, mem};
use std::ops::Deref;
{
/// The monitors
pub monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
+ watch_events: Mutex<WatchEventQueue>,
chain_monitor: C,
broadcaster: T,
logger: L,
fee_estimator: F
}
+struct WatchEventQueue {
+ watched: ChainWatchedUtil,
+ events: Vec<chain::WatchEvent>,
+}
+
+impl WatchEventQueue {
+ fn new() -> Self {
+ Self {
+ watched: ChainWatchedUtil::new(),
+ events: Vec::new(),
+ }
+ }
+
+ fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) {
+ if self.watched.register_tx(txid, script_pubkey) {
+ self.events.push(chain::WatchEvent::WatchTransaction {
+ txid: *txid,
+ script_pubkey: script_pubkey.clone()
+ });
+ }
+ }
+
+ fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) {
+ let (txid, index) = outpoint;
+ if self.watched.register_outpoint((*txid, index as u32), script_pubkey) {
+ self.events.push(chain::WatchEvent::WatchOutput {
+ outpoint: OutPoint {
+ txid: *txid,
+ index: index as u16,
+ },
+ script_pubkey: script_pubkey.clone(),
+ });
+ }
+ }
+
+ fn dequeue_events(&mut self) -> Vec<chain::WatchEvent> {
+ let mut pending_events = Vec::with_capacity(self.events.len());
+ pending_events.append(&mut self.events);
+ pending_events
+ }
+
+ fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<(usize, &'a Transaction)> {
+ let mut matched_txids = HashSet::new();
+ txdata.iter().filter(|&&(_, tx)| {
+ // A tx matches the filter if it either matches the filter directly (via does_match_tx)
+ // or if it is a descendant of another matched transaction within the same block.
+ let mut matched = self.watched.does_match_tx(tx);
+ for input in tx.input.iter() {
+ if matched || matched_txids.contains(&input.previous_output.txid) {
+ matched = true;
+ break;
+ }
+ }
+ if matched {
+ matched_txids.insert(tx.txid());
+ }
+ matched
+ }).map(|e| *e).collect()
+ }
+}
+
impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send, C: Deref + Sync + Send>
ChainListener for SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C>
where T::Target: BroadcasterInterface,
C::Target: ChainWatchInterface,
{
fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) {
- let mut reentered = true;
- while reentered {
- let matched_indexes = self.chain_monitor.filter_block(header, txdata);
- let matched_txn: Vec<_> = matched_indexes.iter().map(|index| txdata[*index]).collect();
- let last_seen = self.chain_monitor.reentered();
- {
- let mut monitors = self.monitors.lock().unwrap();
- for monitor in monitors.values_mut() {
- let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
-
- for (ref txid, ref outputs) in txn_outputs {
- for (idx, output) in outputs.iter().enumerate() {
- self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
- }
+ let mut watch_events = self.watch_events.lock().unwrap();
+ let matched_txn = watch_events.filter_block(txdata);
+ {
+ let mut monitors = self.monitors.lock().unwrap();
+ for monitor in monitors.values_mut() {
+ let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+
+ for (ref txid, ref outputs) in txn_outputs {
+ for (idx, output) in outputs.iter().enumerate() {
+ watch_events.watch_output((txid, idx), &output.script_pubkey);
}
}
}
- reentered = last_seen != self.chain_monitor.reentered();
}
}
pub fn new(chain_monitor: C, broadcaster: T, logger: L, feeest: F) -> SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C> {
let res = SimpleManyChannelMonitor {
monitors: Mutex::new(HashMap::new()),
+ watch_events: Mutex::new(WatchEventQueue::new()),
chain_monitor,
broadcaster,
logger,
/// Adds or updates the monitor which monitors the channel referred to by the given key.
pub fn add_monitor_by_key(&self, key: Key, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
+ let mut watch_events = self.watch_events.lock().unwrap();
let mut monitors = self.monitors.lock().unwrap();
let entry = match monitors.entry(key) {
hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given key is already present")),
{
let funding_txo = monitor.get_funding_txo();
log_trace!(self.logger, "Got new Channel Monitor for channel {}", log_bytes!(funding_txo.0.to_channel_id()[..]));
- self.chain_monitor.install_watch_tx(&funding_txo.0.txid, &funding_txo.1);
- self.chain_monitor.install_watch_outpoint((funding_txo.0.txid, funding_txo.0.index as u32), &funding_txo.1);
+ watch_events.watch_tx(&funding_txo.0.txid, &funding_txo.1);
+ watch_events.watch_output((&funding_txo.0.txid, funding_txo.0.index as usize), &funding_txo.1);
for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
for (idx, script) in outputs.iter().enumerate() {
- self.chain_monitor.install_watch_outpoint((*txid, idx as u32), script);
+ watch_events.watch_output((txid, idx), script);
}
}
}
}
}
+impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref, C: Deref> chain::WatchEventProvider for SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C>
+ where T::Target: BroadcasterInterface,
+ F::Target: FeeEstimator,
+ L::Target: Logger,
+ C::Target: ChainWatchInterface,
+{
+ fn release_pending_watch_events(&self) -> Vec<chain::WatchEvent> {
+ self.watch_events.lock().unwrap().dequeue_events()
+ }
+}
+
/// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction,
/// instead claiming it in its own individual transaction.
pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12;
/// Adds a monitor for the given `funding_txo`.
///
- /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with
- /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected
- /// callbacks with the funding transaction, or any spends of it.
- ///
- /// Further, the implementer must also ensure that each output returned in
- /// monitor.get_outputs_to_watch() is registered to ensure that the provided monitor learns about
- /// any spends of any of the outputs.
- ///
- /// Any spends of outputs which should have been registered which aren't passed to
- /// ChannelMonitors via block_connected may result in FUNDS LOSS.
+ /// Implementations must ensure that `monitor` receives block_connected calls for blocks with
+ /// the funding transaction or any spends of it, as well as any spends of outputs returned by
+ /// get_outputs_to_watch. Not doing so may result in LOST FUNDS.
fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<Self::Keys>) -> Result<(), ChannelMonitorUpdateErr>;
/// Updates a monitor for the given `funding_txo`.
///
- /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with
- /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected
- /// callbacks with the funding transaction, or any spends of it.
- ///
- /// Further, the implementer must also ensure that each output returned in
- /// monitor.get_watch_outputs() is registered to ensure that the provided monitor learns about
- /// any spends of any of the outputs.
- ///
- /// Any spends of outputs which should have been registered which aren't passed to
- /// ChannelMonitors via block_connected may result in FUNDS LOSS.
+ /// TODO(jkczyz): Determine where this should go from e73036c6845fd3cc16479a1b497db82a5ebb3897.
///
/// In case of distributed watchtowers deployment, even if an Err is return, the new version
/// must be written to disk, as state may have been stored but rejected due to a block forcing
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let tx = create_chan_between_nodes_with_value_init(&nodes[0], &nodes[1], 100000, 10001, InitFeatures::known(), InitFeatures::known());
- assert!(nodes[0].chain_monitor.does_match_tx(&tx));
- assert!(nodes[1].chain_monitor.does_match_tx(&tx));
-
let block = Block {
header: BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
txdata: vec![tx],
#[test]
fn test_htlc_on_chain_success() {
- // Test that in case of a unilateral close onchain, we detect the state of output thanks to
- // ChainWatchInterface and pass the preimage backward accordingly. So here we test that ChannelManager is
+ // Test that in case of a unilateral close onchain, we detect the state of output and pass
+ // the preimage backward accordingly. So here we test that ChannelManager is
// broadcasting the right event to other nodes in payment path.
// We test with two HTLCs simultaneously as that was not handled correctly in the past.
// A --------------------> B ----------------------> C (preimage)
#[test]
fn test_htlc_on_chain_timeout() {
- // Test that in case of a unilateral close onchain, we detect the state of output thanks to
- // ChainWatchInterface and timeout the HTLC backward accordingly. So here we test that ChannelManager is
+ // Test that in case of a unilateral close onchain, we detect the state of output and
+ // timeout the HTLC backward accordingly. So here we test that ChannelManager is
// broadcasting the right event to other nodes in payment path.
// A ------------------> B ----------------------> C (timeout)
// B's commitment tx C's commitment tx
#[test]
fn test_onchain_to_onchain_claim() {
- // Test that in case of channel closure, we detect the state of output thanks to
- // ChainWatchInterface and claim HTLC on downstream peer's remote commitment tx.
+ // Test that in case of channel closure, we detect the state of output and claim HTLC
+ // on downstream peer's remote commitment tx.
// First, have C claim an HTLC against its own latest commitment transaction.
// Then, broadcast these to B, which should update the monitor downstream on the A<->B
// channel.