Anyone is invited to contribute without regard to technical experience, "expertise", OSS
experience, age, or other concern. However, the development of cryptocurrencies demands a
-high-level of rigor, adversial thinking, thorough testing and risk-minimization.
-Any bug may cost users real money. That said we deeply welcome people contributing
+high-level of rigor, adversarial thinking, thorough testing and risk-minimization.
+Any bug may cost users real money. That being said, we deeply welcome people contributing
for the first time to an open source project or pick up Rust while contributing. Don't be shy,
you'll learn.
use std::collections::HashSet;
use std::ops::Deref;
use std::marker::PhantomData;
+use std::ptr;
/// Used to give chain error details upstream
pub enum ChainError {
pub const MIN_RELAY_FEE_SAT_PER_1000_WEIGHT: u64 = 4000;
/// Utility for tracking registered txn/outpoints and checking for matches
+#[cfg_attr(test, derive(PartialEq))]
pub struct ChainWatchedUtil {
watch_all: bool,
}
/// Register the given listener to receive events.
- // TODO: unregister
pub fn register_listener(&self, listener: CL) {
let mut vec = self.listeners.lock().unwrap();
vec.push(listener);
}
+ /// Unregister the given listener to no longer
+ /// receive events.
+ ///
+ /// If the same listener is registered multiple times, unregistering
+ /// will remove ALL occurrences of that listener. Comparison is done using
+ /// the pointer returned by the Deref trait implementation.
+ pub fn unregister_listener(&self, listener: CL) {
+ let mut vec = self.listeners.lock().unwrap();
+ // item is a ref to an abstract thing that dereferences to a ChainListener,
+ // so dereference it twice to get the ChainListener itself
+ vec.retain(|item | !ptr::eq(&(**item), &(*listener)));
+ }
/// Notify listeners that a block was connected given a full, unfiltered block.
///
logger: Arc<Logger>,
}
+// We only expose PartialEq in test since its somewhat unclear exactly what it should do and we're
+// only comparing a subset of fields (essentially just checking that the set of things we're
+// watching is the same).
+#[cfg(test)]
+impl PartialEq for ChainWatchInterfaceUtil {
+ fn eq(&self, o: &Self) -> bool {
+ self.network == o.network &&
+ *self.watched.lock().unwrap() == *o.watched.lock().unwrap()
+ }
+}
+
/// Register listener
impl ChainWatchInterface for ChainWatchInterfaceUtil {
fn install_watch_tx(&self, txid: &Sha256dHash, script_pub_key: &Script) {
watched.does_match_tx(tx)
}
}
+
+#[cfg(test)]
+mod tests {
+ use ln::functional_test_utils::{create_node_cfgs};
+ use super::{BlockNotifier, ChainListener};
+ use std::ptr;
+
+ #[test]
+ fn register_listener_test() {
+ let node_cfgs = create_node_cfgs(1);
+ let block_notifier = BlockNotifier::new(node_cfgs[0].chain_monitor.clone());
+ assert_eq!(block_notifier.listeners.lock().unwrap().len(), 0);
+ let listener = &node_cfgs[0].chan_monitor.simple_monitor as &ChainListener;
+ block_notifier.register_listener(listener);
+ let vec = block_notifier.listeners.lock().unwrap();
+ assert_eq!(vec.len(), 1);
+ let item = vec.first().clone().unwrap();
+ assert!(ptr::eq(&(**item), &(*listener)));
+ }
+
+ #[test]
+ fn unregister_single_listener_test() {
+ let node_cfgs = create_node_cfgs(2);
+ let block_notifier = BlockNotifier::new(node_cfgs[0].chain_monitor.clone());
+ let listener1 = &node_cfgs[0].chan_monitor.simple_monitor as &ChainListener;
+ let listener2 = &node_cfgs[1].chan_monitor.simple_monitor as &ChainListener;
+ block_notifier.register_listener(listener1);
+ block_notifier.register_listener(listener2);
+ let vec = block_notifier.listeners.lock().unwrap();
+ assert_eq!(vec.len(), 2);
+ drop(vec);
+ block_notifier.unregister_listener(listener1);
+ let vec = block_notifier.listeners.lock().unwrap();
+ assert_eq!(vec.len(), 1);
+ let item = vec.first().clone().unwrap();
+ assert!(ptr::eq(&(**item), &(*listener2)));
+ }
+
+ #[test]
+ fn unregister_single_listener_ref_test() {
+ let node_cfgs = create_node_cfgs(2);
+ let block_notifier = BlockNotifier::new(node_cfgs[0].chain_monitor.clone());
+ block_notifier.register_listener(&node_cfgs[0].chan_monitor.simple_monitor as &ChainListener);
+ block_notifier.register_listener(&node_cfgs[1].chan_monitor.simple_monitor as &ChainListener);
+ let vec = block_notifier.listeners.lock().unwrap();
+ assert_eq!(vec.len(), 2);
+ drop(vec);
+ block_notifier.unregister_listener(&node_cfgs[0].chan_monitor.simple_monitor);
+ let vec = block_notifier.listeners.lock().unwrap();
+ assert_eq!(vec.len(), 1);
+ let item = vec.first().clone().unwrap();
+ assert!(ptr::eq(&(**item), &(*&node_cfgs[1].chan_monitor.simple_monitor)));
+ }
+
+ #[test]
+ fn unregister_multiple_of_the_same_listeners_test() {
+ let node_cfgs = create_node_cfgs(2);
+ let block_notifier = BlockNotifier::new(node_cfgs[0].chain_monitor.clone());
+ let listener1 = &node_cfgs[0].chan_monitor.simple_monitor as &ChainListener;
+ let listener2 = &node_cfgs[1].chan_monitor.simple_monitor as &ChainListener;
+ block_notifier.register_listener(listener1);
+ block_notifier.register_listener(listener1);
+ block_notifier.register_listener(listener2);
+ let vec = block_notifier.listeners.lock().unwrap();
+ assert_eq!(vec.len(), 3);
+ drop(vec);
+ block_notifier.unregister_listener(listener1);
+ let vec = block_notifier.listeners.lock().unwrap();
+ assert_eq!(vec.len(), 1);
+ let item = vec.first().clone().unwrap();
+ assert!(ptr::eq(&(**item), &(*listener2)));
+ }
+}
}
(self.pending_inbound_htlcs.len() as u64 - dropped_inbound_htlcs).write(writer)?;
for htlc in self.pending_inbound_htlcs.iter() {
+ if let &InboundHTLCState::RemoteAnnounced(_) = &htlc.state {
+ continue; // Drop
+ }
htlc.htlc_id.write(writer)?;
htlc.amount_msat.write(writer)?;
htlc.cltv_expiry.write(writer)?;
htlc.payment_hash.write(writer)?;
match &htlc.state {
- &InboundHTLCState::RemoteAnnounced(_) => {}, // Drop
+ &InboundHTLCState::RemoteAnnounced(_) => unreachable!(),
&InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref htlc_state) => {
1u8.write(writer)?;
htlc_state.write(writer)?;
let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
for _ in 0..channel_count {
let mut channel: Channel<ChanSigner> = ReadableArgs::read(reader, args.logger.clone())?;
- if channel.last_block_connected != last_block_hash {
+ if channel.last_block_connected != Default::default() && channel.last_block_connected != last_block_hash {
return Err(DecodeError::InvalidValue);
}
pub trait ManyChannelMonitor<ChanSigner: ChannelKeys>: Send + Sync {
/// Adds or updates a monitor for the given `funding_txo`.
///
- /// Implementor must also ensure that the funding_txo outpoint is registered with any relevant
- /// ChainWatchInterfaces such that the provided monitor receives block_connected callbacks with
- /// any spends of it.
+ /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with
+ /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected
+ /// callbacks with the funding transaction, or any spends of it.
+ ///
+ /// Further, the implementer must also ensure that each output returned in
+ /// monitor.get_outputs_to_watch() is registered to ensure that the provided monitor learns about
+ /// any spends of any of the outputs.
+ ///
+ /// Any spends of outputs which should have been registered which aren't passed to
+ /// ChannelMonitors via block_connected may result in funds loss.
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr>;
/// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
self.chain_monitor.watch_all_txn();
}
}
+ for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
+ for (idx, script) in outputs.iter().enumerate() {
+ self.chain_monitor.install_watch_outpoint((*txid, idx as u32), script);
+ }
+ }
monitors.insert(key, monitor);
Ok(())
}
// actions when we receive a block with given height. Actions depend on OnchainEvent type.
onchain_events_waiting_threshold_conf: HashMap<u32, Vec<OnchainEvent>>,
+ // If we get serialized out and re-read, we need to make sure that the chain monitoring
+ // interface knows about the TXOs that we want to be notified of spends of. We could probably
+ // be smart and derive them from the above storage fields, but its much simpler and more
+ // Obviously Correct (tm) if we just keep track of them explicitly.
+ outputs_to_watch: HashMap<Sha256dHash, Vec<Script>>,
+
// We simply modify last_block_hash in Channel's block_connected so that serialization is
// consistent but hopefully the users' copy handles block_connected in a consistent way.
// (we do *not*, however, update them in insert_combine to ensure any local user copies keep
self.to_remote_rescue != other.to_remote_rescue ||
self.pending_claim_requests != other.pending_claim_requests ||
self.claimable_outpoints != other.claimable_outpoints ||
- self.onchain_events_waiting_threshold_conf != other.onchain_events_waiting_threshold_conf
+ self.onchain_events_waiting_threshold_conf != other.onchain_events_waiting_threshold_conf ||
+ self.outputs_to_watch != other.outputs_to_watch
{
false
} else {
}
}
+ (self.outputs_to_watch.len() as u64).write(writer)?;
+ for (txid, output_scripts) in self.outputs_to_watch.iter() {
+ txid.write(writer)?;
+ (output_scripts.len() as u64).write(writer)?;
+ for script in output_scripts.iter() {
+ script.write(writer)?;
+ }
+ }
+
Ok(())
}
claimable_outpoints: HashMap::new(),
onchain_events_waiting_threshold_conf: HashMap::new(),
+ outputs_to_watch: HashMap::new(),
last_block_hash: Default::default(),
secp_ctx: Secp256k1::new(),
}
}
+ /// Gets a list of txids, with their output scripts (in the order they appear in the
+ /// transaction), which we must learn about spends of via block_connected().
+ pub fn get_outputs_to_watch(&self) -> &HashMap<Sha256dHash, Vec<Script>> {
+ &self.outputs_to_watch
+ }
+
/// Gets the sets of all outpoints which this ChannelMonitor expects to hear about spends of.
/// Generally useful when deserializing as during normal operation the return values of
/// block_connected are sufficient to ensure all relevant outpoints are being monitored (note
}
}
+ /// Called by SimpleManyChannelMonitor::block_connected, which implements
+ /// ChainListener::block_connected.
+ /// Eventually this should be pub and, roughly, implement ChainListener, however this requires
+ /// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of
+ /// on-chain.
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>, Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
for tx in txn_matched {
let mut output_val = 0;
}
}
self.last_block_hash = block_hash.clone();
+ for &(ref txid, ref output_scripts) in watch_outputs.iter() {
+ self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
+ }
(watch_outputs, spendable_outputs, htlc_updated)
}
onchain_events_waiting_threshold_conf.insert(height_target, events);
}
+ let outputs_to_watch_len: u64 = Readable::read(reader)?;
+ let mut outputs_to_watch = HashMap::with_capacity(cmp::min(outputs_to_watch_len as usize, MAX_ALLOC_SIZE / (mem::size_of::<Sha256dHash>() + mem::size_of::<Vec<Script>>())));
+ for _ in 0..outputs_to_watch_len {
+ let txid = Readable::read(reader)?;
+ let outputs_len: u64 = Readable::read(reader)?;
+ let mut outputs = Vec::with_capacity(cmp::min(outputs_len as usize, MAX_ALLOC_SIZE / mem::size_of::<Script>()));
+ for _ in 0..outputs_len {
+ outputs.push(Readable::read(reader)?);
+ }
+ if let Some(_) = outputs_to_watch.insert(txid, outputs) {
+ return Err(DecodeError::InvalidValue);
+ }
+ }
+
Ok((last_block_hash.clone(), ChannelMonitor {
commitment_transaction_number_obscure_factor,
claimable_outpoints,
onchain_events_waiting_threshold_conf,
+ outputs_to_watch,
last_block_hash,
secp_ctx,
use chain::chaininterface;
use chain::transaction::OutPoint;
use chain::keysinterface::KeysInterface;
-use ln::channelmanager::{ChannelManager,RAACommitmentOrder, PaymentPreimage, PaymentHash};
+use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash};
+use ln::channelmonitor::{ChannelMonitor, ManyChannelMonitor};
use ln::router::{Route, Router};
use ln::features::InitFeatures;
use ln::msgs;
use util::errors::APIError;
use util::logger::Logger;
use util::config::UserConfig;
+use util::ser::{ReadableArgs, Writeable};
use bitcoin::util::hash::BitcoinHash;
use bitcoin::blockdata::block::BlockHeader;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::mem;
-use std::collections::HashSet;
+use std::collections::{HashSet, HashMap};
pub const CHAN_CONFIRM_DEPTH: u32 = 100;
pub fn confirm_transaction<'a, 'b: 'a>(notifier: &'a chaininterface::BlockNotifierRef<'b>, chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) {
assert!(self.node.get_and_clear_pending_msg_events().is_empty());
assert!(self.node.get_and_clear_pending_events().is_empty());
assert!(self.chan_monitor.added_monitors.lock().unwrap().is_empty());
+
+ // Check that if we serialize and then deserialize all our channel monitors we get the
+ // same set of outputs to watch for on chain as we have now. Note that if we write
+ // tests that fully close channels and remove the monitors at some point this may break.
+ let feeest = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
+ let old_monitors = self.chan_monitor.simple_monitor.monitors.lock().unwrap();
+ let mut deserialized_monitors = Vec::new();
+ for (_, old_monitor) in old_monitors.iter() {
+ let mut w = test_utils::TestVecWriter(Vec::new());
+ old_monitor.write_for_disk(&mut w).unwrap();
+ let (_, deserialized_monitor) = <(Sha256d, ChannelMonitor<EnforcingChannelKeys>)>::read(
+ &mut ::std::io::Cursor::new(&w.0), Arc::clone(&self.logger) as Arc<Logger>).unwrap();
+ deserialized_monitors.push(deserialized_monitor);
+ }
+
+ // Before using all the new monitors to check the watch outpoints, use the full set of
+ // them to ensure we can write and reload our ChannelManager.
+ {
+ let mut channel_monitors = HashMap::new();
+ for monitor in deserialized_monitors.iter_mut() {
+ channel_monitors.insert(monitor.get_funding_txo().unwrap(), monitor);
+ }
+
+ let mut w = test_utils::TestVecWriter(Vec::new());
+ self.node.write(&mut w).unwrap();
+ <(Sha256d, ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor>)>::read(&mut ::std::io::Cursor::new(w.0), ChannelManagerReadArgs {
+ default_config: UserConfig::default(),
+ keys_manager: self.keys_manager.clone(),
+ fee_estimator: Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }),
+ monitor: self.chan_monitor,
+ tx_broadcaster: self.tx_broadcaster.clone(),
+ logger: Arc::new(test_utils::TestLogger::new()),
+ channel_monitors: &mut channel_monitors,
+ }).unwrap();
+ }
+
+ let chain_watch = Arc::new(chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet, Arc::clone(&self.logger) as Arc<Logger>));
+ let channel_monitor = test_utils::TestChannelMonitor::new(chain_watch.clone(), self.tx_broadcaster.clone(), self.logger.clone(), feeest);
+ for deserialized_monitor in deserialized_monitors.drain(..) {
+ if let Err(_) = channel_monitor.add_update_monitor(deserialized_monitor.get_funding_txo().unwrap(), deserialized_monitor) {
+ panic!();
+ }
+ }
+ if *chain_watch != *self.chain_monitor {
+ panic!();
+ }
}
}
}
}
}
+fn do_test_sanity_on_in_flight_opens(steps: u8) {
+ // Previously, we had issues deserializing channels when we hadn't connected the first block
+ // after creation. To catch that and similar issues, we lean on the Node::drop impl to test
+ // serialization round-trips and simply do steps towards opening a channel and then drop the
+ // Node objects.
+
+ let node_cfgs = create_node_cfgs(2);
+ let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+ let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+ if steps & 0b1000_0000 != 0{
+ let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ nodes[0].block_notifier.block_connected_checked(&header, 1, &Vec::new(), &[0; 0]);
+ nodes[1].block_notifier.block_connected_checked(&header, 1, &Vec::new(), &[0; 0]);
+ }
+
+ if steps & 0x0f == 0 { return; }
+ nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100000, 10001, 42).unwrap();
+ let open_channel = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+
+ if steps & 0x0f == 1 { return; }
+ nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), InitFeatures::supported(), &open_channel);
+ let accept_channel = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
+
+ if steps & 0x0f == 2 { return; }
+ nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), InitFeatures::supported(), &accept_channel);
+
+ let (temporary_channel_id, tx, funding_output) = create_funding_transaction(&nodes[0], 100000, 42);
+
+ if steps & 0x0f == 3 { return; }
+ {
+ nodes[0].node.funding_transaction_generated(&temporary_channel_id, funding_output);
+ let mut added_monitors = nodes[0].chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ assert_eq!(added_monitors[0].0, funding_output);
+ added_monitors.clear();
+ }
+ let funding_created = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id());
+
+ if steps & 0x0f == 4 { return; }
+ nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created);
+ {
+ let mut added_monitors = nodes[1].chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ assert_eq!(added_monitors[0].0, funding_output);
+ added_monitors.clear();
+ }
+ let funding_signed = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+
+ if steps & 0x0f == 5 { return; }
+ nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed);
+ {
+ let mut added_monitors = nodes[0].chan_monitor.added_monitors.lock().unwrap();
+ assert_eq!(added_monitors.len(), 1);
+ assert_eq!(added_monitors[0].0, funding_output);
+ added_monitors.clear();
+ }
+
+ let events_4 = nodes[0].node.get_and_clear_pending_events();
+ assert_eq!(events_4.len(), 1);
+ match events_4[0] {
+ Event::FundingBroadcastSafe { ref funding_txo, user_channel_id } => {
+ assert_eq!(user_channel_id, 42);
+ assert_eq!(*funding_txo, funding_output);
+ },
+ _ => panic!("Unexpected event"),
+ };
+
+ if steps & 0x0f == 6 { return; }
+ create_chan_between_nodes_with_value_confirm_first(&nodes[0], &nodes[1], &tx);
+
+ if steps & 0x0f == 7 { return; }
+ confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &tx, tx.version);
+ create_chan_between_nodes_with_value_confirm_second(&nodes[1], &nodes[0]);
+}
+
+#[test]
+fn test_sanity_on_in_flight_opens() {
+ do_test_sanity_on_in_flight_opens(0);
+ do_test_sanity_on_in_flight_opens(0 | 0b1000_0000);
+ do_test_sanity_on_in_flight_opens(1);
+ do_test_sanity_on_in_flight_opens(1 | 0b1000_0000);
+ do_test_sanity_on_in_flight_opens(2);
+ do_test_sanity_on_in_flight_opens(2 | 0b1000_0000);
+ do_test_sanity_on_in_flight_opens(3);
+ do_test_sanity_on_in_flight_opens(3 | 0b1000_0000);
+ do_test_sanity_on_in_flight_opens(4);
+ do_test_sanity_on_in_flight_opens(4 | 0b1000_0000);
+ do_test_sanity_on_in_flight_opens(5);
+ do_test_sanity_on_in_flight_opens(5 | 0b1000_0000);
+ do_test_sanity_on_in_flight_opens(6);
+ do_test_sanity_on_in_flight_opens(6 | 0b1000_0000);
+ do_test_sanity_on_in_flight_opens(7);
+ do_test_sanity_on_in_flight_opens(7 | 0b1000_0000);
+ do_test_sanity_on_in_flight_opens(8);
+ do_test_sanity_on_in_flight_opens(8 | 0b1000_0000);
+}
+
#[test]
fn test_update_fee_vanilla() {
let node_cfgs = create_node_cfgs(2);
mod wire;
#[cfg(test)]
-#[macro_use] mod functional_test_utils;
+#[macro_use]
+pub(crate) mod functional_test_utils;
#[cfg(test)]
mod functional_tests;
#[cfg(test)]