WatchEventProvider served as a means for replacing ChainWatchInterface.
However, it requires users to explicitly fetch WatchEvents, even if not
interested in them. Replace WatchEventProvider by chain::Filter, which
is an optional member of ChainMonitor. If set, interesting transactions
and output spends are registered such that blocks containing them can be
retrieved from a chain source in an efficient manner.
This is useful when the chain source is not a full node. For Electrum,
it allows for pre-filtered blocks. For BIP157/158, it serves as a means
to match against compact filters.
| ----------------- \ _---------------- / /
| | chain::Access | \ / | ChainMonitor |---------------
| ----------------- \ / ----------------
- | | \ /
-(as RoutingMessageHandler) v v
- \ -------------------- ---------
- -----------------> | NetGraphMsgHandler | | Event |
- -------------------- ---------
+ | | \ / |
+(as RoutingMessageHandler) v v v
+ \ -------------------- --------- -----------------
+ -----------------> | NetGraphMsgHandler | | Event | | chain::Filter |
+ -------------------- --------- -----------------
```
struct TestChainMonitor {
pub logger: Arc<dyn Logger>,
- pub chain_monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
+ pub chain_monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
// logic will automatically force-close our channels for us (as we don't have an up-to-date
impl TestChainMonitor {
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>) -> Self {
Self {
- chain_monitor: Arc::new(channelmonitor::ChainMonitor::new(broadcaster, logger.clone(), feeest)),
+ chain_monitor: Arc::new(channelmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest)),
logger,
update_ret: Mutex::new(Ok(())),
latest_monitors: Mutex::new(HashMap::new()),
type ChannelMan = ChannelManager<
EnforcingChannelKeys,
- Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
+ Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>>;
struct MoneyLossDetector<'a> {
manager: Arc<ChannelMan>,
- monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
+ monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
handler: PeerMan<'a>,
peers: &'a RefCell<[bool; 256]>,
impl<'a> MoneyLossDetector<'a> {
pub fn new(peers: &'a RefCell<[bool; 256]>,
manager: Arc<ChannelMan>,
- monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
+ monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
handler: PeerMan<'a>) -> Self {
MoneyLossDetector {
manager,
};
let broadcast = Arc::new(TestBroadcaster{});
- let monitor = Arc::new(channelmonitor::ChainMonitor::new(broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
+ let monitor = Arc::new(channelmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
let mut config = UserConfig::default();
//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
//! type Logger = dyn lightning::util::logger::Logger;
//! type ChainAccess = dyn lightning::chain::Access;
-//! type ChainMonitor = lightning::ln::channelmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>>;
+//! type ChainFilter = dyn lightning::chain::Filter;
+//! type ChainMonitor = lightning::ln::channelmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>>;
//! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>;
//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>;
//!
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
}
-/// An interface for providing [`WatchEvent`]s.
+/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to
+/// channels.
///
-/// [`WatchEvent`]: enum.WatchEvent.html
-pub trait WatchEventProvider {
- /// Releases events produced since the last call. Subsequent calls must only return new events.
- fn release_pending_watch_events(&self) -> Vec<WatchEvent>;
-}
-
-/// An event indicating on-chain activity to watch for pertaining to a channel.
-pub enum WatchEvent {
- /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
- /// condition.
- WatchTransaction {
- /// Identifier of the transaction.
- txid: Txid,
-
- /// Spending condition for an output of the transaction.
- script_pubkey: Script,
- },
- /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
- /// the spending condition.
- WatchOutput {
- /// Identifier for the output.
- outpoint: OutPoint,
+/// This is useful in order to have a [`Watch`] implementation convey to a chain source which
+/// transactions to be notified of. Notification may take the form of pre-filtering blocks or, in
+/// the case of [BIP 157]/[BIP 158], only fetching a block if the compact filter matches. If
+/// receiving full blocks from a chain source, any further filtering is unnecessary.
+///
+/// After an output has been registered, subsequent block retrievals from the chain source must not
+/// exclude any transactions matching the new criteria nor any in-block descendants of such
+/// transactions.
+///
+/// Note that use as part of a [`Watch`] implementation involves reentrancy. Therefore, the `Filter`
+/// should not block on I/O. Implementations should instead queue the newly monitored data to be
+/// processed later. Then, in order to block until the data has been processed, any `Watch`
+/// invocation that has called the `Filter` must return [`TemporaryFailure`].
+///
+/// [`Watch`]: trait.Watch.html
+/// [`TemporaryFailure`]: ../ln/channelmonitor/enum.ChannelMonitorUpdateErr.html#variant.TemporaryFailure
+/// [BIP 157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
+/// [BIP 158]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
+pub trait Filter: Send + Sync {
+ /// Registers interest in a transaction with `txid` and having an output with `script_pubkey` as
+ /// a spending condition.
+ fn register_tx(&self, txid: Txid, script_pubkey: Script);
- /// Spending condition for the output.
- script_pubkey: Script,
- }
+ /// Registers interest in spends of a transaction output identified by `outpoint` having
+ /// `script_pubkey` as the spending condition.
+ fn register_output(&self, outpoint: OutPoint, script_pubkey: Script);
}
use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
use chain;
+use chain::Filter;
use chain::chaininterface::{ChainWatchedUtil, BroadcasterInterface, FeeEstimator};
use chain::transaction::OutPoint;
use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
///
/// [`chain::Watch`]: ../../chain/trait.Watch.html
/// [`ChannelManager`]: ../channelmanager/struct.ChannelManager.html
-pub struct ChainMonitor<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref>
- where T::Target: BroadcasterInterface,
+pub struct ChainMonitor<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref>
+ where C::Target: chain::Filter,
+ T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
/// The monitors
pub monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChanSigner>>>,
- watch_events: Mutex<WatchEventQueue>,
+ watch_events: Mutex<WatchEventCache>,
+ chain_source: Option<C>,
broadcaster: T,
logger: L,
fee_estimator: F
}
-struct WatchEventQueue {
+struct WatchEventCache {
watched: ChainWatchedUtil,
- events: Vec<chain::WatchEvent>,
+ events: Vec<WatchEvent>,
}
-impl WatchEventQueue {
+/// An event indicating on-chain activity to watch for pertaining to a channel.
+enum WatchEvent {
+ /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
+ /// condition.
+ WatchTransaction {
+ /// Identifier of the transaction.
+ txid: Txid,
+
+ /// Spending condition for an output of the transaction.
+ script_pubkey: Script,
+ },
+ /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
+ /// the spending condition.
+ WatchOutput {
+ /// Identifier for the output.
+ outpoint: OutPoint,
+
+ /// Spending condition for the output.
+ script_pubkey: Script,
+ }
+}
+
+impl WatchEventCache {
fn new() -> Self {
Self {
watched: ChainWatchedUtil::new(),
fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) {
if self.watched.register_tx(txid, script_pubkey) {
- self.events.push(chain::WatchEvent::WatchTransaction {
+ self.events.push(WatchEvent::WatchTransaction {
txid: *txid,
script_pubkey: script_pubkey.clone()
});
fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) {
let (txid, index) = outpoint;
if self.watched.register_outpoint((*txid, index as u32), script_pubkey) {
- self.events.push(chain::WatchEvent::WatchOutput {
+ self.events.push(WatchEvent::WatchOutput {
outpoint: OutPoint {
txid: *txid,
index: index as u16,
}
}
- fn dequeue_events(&mut self) -> Vec<chain::WatchEvent> {
- let mut pending_events = Vec::with_capacity(self.events.len());
- pending_events.append(&mut self.events);
- pending_events
+ fn flush_events<C: Deref>(&mut self, chain_source: &Option<C>) -> bool where C::Target: chain::Filter {
+ let num_events = self.events.len();
+ match chain_source {
+ &None => self.events.clear(),
+ &Some(ref chain_source) => {
+ for event in self.events.drain(..) {
+ match event {
+ WatchEvent::WatchTransaction { txid, script_pubkey } => {
+ chain_source.register_tx(txid, script_pubkey)
+ },
+ WatchEvent::WatchOutput { outpoint, script_pubkey } => {
+ chain_source.register_output(outpoint, script_pubkey)
+ },
+ }
+ }
+ }
+ }
+ num_events > 0
}
fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<(usize, &'a Transaction)> {
}
}
-impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, T, F, L>
- where T::Target: BroadcasterInterface,
+impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, C, T, F, L>
+ where C::Target: chain::Filter,
+ T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
/// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
/// be returned by [`chain::Watch::release_pending_monitor_events`].
///
+ /// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch, returning
+ /// `true` if so. Subsequent calls must not exclude any transactions matching the new outputs
+ /// nor any in-block descendants of such transactions. It is not necessary to re-fetch the block
+ /// to obtain updated `txdata`.
+ ///
/// [`ChannelMonitor::block_connected`]: struct.ChannelMonitor.html#method.block_connected
/// [`chain::Watch::release_pending_monitor_events`]: ../../chain/trait.Watch.html#tymethod.release_pending_monitor_events
- pub fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) {
+ /// [`chain::Filter`]: ../../chain/trait.Filter.html
+ pub fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) -> bool {
let mut watch_events = self.watch_events.lock().unwrap();
let matched_txn = watch_events.filter_block(txdata);
{
}
}
}
+ watch_events.flush_events(&self.chain_source)
}
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
}
}
-impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, T, F, L>
- where T::Target: BroadcasterInterface,
+impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, C, T, F, L>
+ where C::Target: chain::Filter,
+ T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
- /// Creates a new object which can be used to monitor several channels given the chain
- /// interface with which to register to receive notifications.
- pub fn new(broadcaster: T, logger: L, feeest: F) -> Self {
+ /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
+ ///
+ /// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor
+ /// will call back to it indicating transactions and outputs of interest. This allows clients to
+ /// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
+ /// always need to fetch full blocks absent another means for determining which blocks contain
+ /// transactions relevant to the watched channels.
+ ///
+ /// [`chain::Filter`]: ../../chain/trait.Filter.html
+ pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F) -> Self {
Self {
monitors: Mutex::new(HashMap::new()),
- watch_events: Mutex::new(WatchEventQueue::new()),
+ watch_events: Mutex::new(WatchEventCache::new()),
+ chain_source,
broadcaster,
logger,
fee_estimator: feeest,
}
/// Adds the monitor that watches the channel referred to by the given outpoint.
+ ///
+ /// Calls back to [`chain::Filter`] with the funding transaction and outputs to watch.
+ ///
+ /// [`chain::Filter`]: ../../chain/trait.Filter.html
fn add_monitor(&self, outpoint: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
let mut watch_events = self.watch_events.lock().unwrap();
let mut monitors = self.monitors.lock().unwrap();
}
}
entry.insert(monitor);
+ watch_events.flush_events(&self.chain_source);
Ok(())
}
}
}
-impl<ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send> chain::Watch for ChainMonitor<ChanSigner, T, F, L>
- where T::Target: BroadcasterInterface,
+impl<ChanSigner: ChannelKeys, C: Deref + Sync + Send, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send> chain::Watch for ChainMonitor<ChanSigner, C, T, F, L>
+ where C::Target: chain::Filter,
+ T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
}
}
-impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> events::EventsProvider for ChainMonitor<ChanSigner, T, F, L>
- where T::Target: BroadcasterInterface,
+impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> events::EventsProvider for ChainMonitor<ChanSigner, C, T, F, L>
+ where C::Target: chain::Filter,
+ T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
}
}
-impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> chain::WatchEventProvider for ChainMonitor<ChanSigner, T, F, L>
- where T::Target: BroadcasterInterface,
- F::Target: FeeEstimator,
- L::Target: Logger,
-{
- fn release_pending_watch_events(&self) -> Vec<chain::WatchEvent> {
- self.watch_events.lock().unwrap().dequeue_events()
- }
-}
-
/// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction,
/// instead claiming it in its own individual transaction.
pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12;
//! A bunch of useful utilities for building networks of nodes and exchanging messages between
//! nodes for functional tests.
-use chain;
use chain::Watch;
use chain::transaction::OutPoint;
use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
}
pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, height: u32) {
- use chain::WatchEventProvider;
-
- let watch_events = node.chain_monitor.chain_monitor.release_pending_watch_events();
- process_chain_watch_events(&watch_events);
-
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
- loop {
- node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height);
-
- let watch_events = node.chain_monitor.chain_monitor.release_pending_watch_events();
- process_chain_watch_events(&watch_events);
-
- if watch_events.is_empty() {
- break;
- }
- }
-
+ while node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height) {}
node.node.block_connected(&block.header, &txdata, height);
}
-fn process_chain_watch_events(_events: &Vec<chain::WatchEvent>) {}
-
pub fn disconnect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, header: &BlockHeader, height: u32) {
node.chain_monitor.chain_monitor.block_disconnected(header, height);
node.node.block_disconnected(header);
}).unwrap();
}
- let channel_monitor = test_utils::TestChainMonitor::new(self.tx_broadcaster.clone(), &self.logger, &feeest);
+ let chain_source = test_utils::TestChainSource::new(Network::Testnet);
+ let chain_monitor = test_utils::TestChainMonitor::new(Some(&chain_source), self.tx_broadcaster.clone(), &self.logger, &feeest);
for deserialized_monitor in deserialized_monitors.drain(..) {
- if let Err(_) = channel_monitor.watch_channel(deserialized_monitor.get_funding_txo().0, deserialized_monitor) {
+ if let Err(_) = chain_monitor.watch_channel(deserialized_monitor.get_funding_txo().0, deserialized_monitor) {
panic!();
}
}
+ assert_eq!(*chain_source.watched_txn.lock().unwrap(), *self.chain_source.watched_txn.lock().unwrap());
+ assert_eq!(*chain_source.watched_outputs.lock().unwrap(), *self.chain_source.watched_outputs.lock().unwrap());
}
}
}
for i in 0..node_count {
let seed = [i as u8; 32];
let keys_manager = test_utils::TestKeysInterface::new(&seed, Network::Testnet);
- let chain_monitor = test_utils::TestChainMonitor::new(&chanmon_cfgs[i].tx_broadcaster, &chanmon_cfgs[i].logger, &chanmon_cfgs[i].fee_estimator);
+ let chain_monitor = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[i].chain_source), &chanmon_cfgs[i].tx_broadcaster, &chanmon_cfgs[i].logger, &chanmon_cfgs[i].fee_estimator);
nodes.push(NodeCfg { chain_source: &chanmon_cfgs[i].chain_source, logger: &chanmon_cfgs[i].logger, tx_broadcaster: &chanmon_cfgs[i].tx_broadcaster, fee_estimator: &chanmon_cfgs[i].fee_estimator, chain_monitor, keys_manager, node_seed: seed });
}
logger = test_utils::TestLogger::new();
fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
- new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
+ new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
nodes[0].chain_monitor = &new_chain_monitor;
let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut chan_0_monitor_read).unwrap();
fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
logger = test_utils::TestLogger::new();
- new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
+ new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
nodes[0].chain_monitor = &new_chain_monitor;
let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut chan_0_monitor_read).unwrap();
logger = test_utils::TestLogger::new();
fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
- new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
+ new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
nodes[0].chain_monitor = &new_chain_monitor;
let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut chan_0_monitor_read).unwrap();
logger = test_utils::TestLogger::new();
fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
- new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
+ new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
nodes[0].chain_monitor = &new_chain_monitor;
let mut node_0_stale_monitors = Vec::new();
// We manually create the node configuration to backup the seed.
let seed = [42; 32];
let keys_manager = test_utils::TestKeysInterface::new(&seed, Network::Testnet);
- let chain_monitor = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator);
+ let chain_monitor = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator);
let node = NodeCfg { chain_source: &chanmon_cfgs[0].chain_source, logger: &chanmon_cfgs[0].logger, tx_broadcaster: &chanmon_cfgs[0].tx_broadcaster, fee_estimator: &chanmon_cfgs[0].fee_estimator, chain_monitor, keys_manager, node_seed: seed };
let mut node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
node_cfgs.remove(0);
tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())};
fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet);
- monitor = test_utils::TestChainMonitor::new(&tx_broadcaster, &logger, &fee_estimator);
+ monitor = test_utils::TestChainMonitor::new(Some(&chain_source), &tx_broadcaster, &logger, &fee_estimator);
node_state_0 = {
let mut channel_monitors = HashMap::new();
channel_monitors.insert(OutPoint { txid: chan.3.txid(), index: 0 }, &mut chain_monitor);
let preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 9_000_000).0;
// Copy ChainMonitor to simulate a watchtower and update block height of node 0 until its ChannelMonitor timeout HTLC onchain
+ let chain_source = test_utils::TestChainSource::new(Network::Testnet);
let logger = test_utils::TestLogger::with_id(format!("node {}", 0));
let watchtower = {
let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap();
let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::read(
&mut ::std::io::Cursor::new(&w.0)).unwrap().1;
assert!(new_monitor == *monitor);
- let watchtower = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
+ let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok());
watchtower
};
route_payment(&nodes[0], &vec!(&nodes[1])[..], 9_000_000).0;
// Copy ChainMonitor to simulate watchtower Alice and update block height her ChannelMonitor timeout HTLC onchain
+ let chain_source = test_utils::TestChainSource::new(Network::Testnet);
let logger = test_utils::TestLogger::with_id(format!("node {}", "Alice"));
let watchtower_alice = {
let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap();
let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::read(
&mut ::std::io::Cursor::new(&w.0)).unwrap().1;
assert!(new_monitor == *monitor);
- let watchtower = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
+ let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok());
watchtower
};
}
// Copy ChainMonitor to simulate watchtower Bob and make it receive a commitment update first.
+ let chain_source = test_utils::TestChainSource::new(Network::Testnet);
let logger = test_utils::TestLogger::with_id(format!("node {}", "Bob"));
let watchtower_bob = {
let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap();
let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::read(
&mut ::std::io::Cursor::new(&w.0)).unwrap().1;
assert!(new_monitor == *monitor);
- let watchtower = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
+ let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok());
watchtower
};
use bitcoin::blockdata::script::{Builder, Script};
use bitcoin::blockdata::opcodes;
use bitcoin::network::constants::Network;
-use bitcoin::hash_types::BlockHash;
+use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1, Signature};
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp, mem};
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
pub struct TestVecWriter(pub Vec<u8>);
impl Writer for TestVecWriter {
pub struct TestChainMonitor<'a> {
pub added_monitors: Mutex<Vec<(OutPoint, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>>,
pub latest_monitor_update_id: Mutex<HashMap<[u8; 32], (OutPoint, u64)>>,
- pub chain_monitor: channelmonitor::ChainMonitor<EnforcingChannelKeys, &'a chaininterface::BroadcasterInterface, &'a TestFeeEstimator, &'a TestLogger>,
+ pub chain_monitor: channelmonitor::ChainMonitor<EnforcingChannelKeys, &'a TestChainSource, &'a chaininterface::BroadcasterInterface, &'a TestFeeEstimator, &'a TestLogger>,
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
// If this is set to Some(), after the next return, we'll always return this until update_ret
// is changed:
pub next_update_ret: Mutex<Option<Result<(), channelmonitor::ChannelMonitorUpdateErr>>>,
}
impl<'a> TestChainMonitor<'a> {
- pub fn new(broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator) -> Self {
+ pub fn new(chain_source: Option<&'a TestChainSource>, broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator) -> Self {
Self {
added_monitors: Mutex::new(Vec::new()),
latest_monitor_update_id: Mutex::new(HashMap::new()),
- chain_monitor: channelmonitor::ChainMonitor::new(broadcaster, logger, fee_estimator),
+ chain_monitor: channelmonitor::ChainMonitor::new(chain_source, broadcaster, logger, fee_estimator),
update_ret: Mutex::new(Ok(())),
next_update_ret: Mutex::new(None),
}
pub struct TestChainSource {
pub genesis_hash: BlockHash,
pub utxo_ret: Mutex<Result<TxOut, chain::AccessError>>,
+ pub watched_txn: Mutex<HashSet<(Txid, Script)>>,
+ pub watched_outputs: Mutex<HashSet<(OutPoint, Script)>>,
}
impl TestChainSource {
Self {
genesis_hash: genesis_block(network).block_hash(),
utxo_ret: Mutex::new(Ok(TxOut { value: u64::max_value(), script_pubkey })),
+ watched_txn: Mutex::new(HashSet::new()),
+ watched_outputs: Mutex::new(HashSet::new()),
}
}
}
self.utxo_ret.lock().unwrap().clone()
}
}
+
+impl chain::Filter for TestChainSource {
+ fn register_tx(&self, txid: Txid, script_pubkey: Script) {
+ self.watched_txn.lock().unwrap().insert((txid, script_pubkey));
+ }
+
+ fn register_output(&self, outpoint: OutPoint, script_pubkey: Script) {
+ self.watched_outputs.lock().unwrap().insert((outpoint, script_pubkey));
+ }
+}