X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Frouting%2Futxo.rs;h=8733def13d95d26dc3d9c446cb5851d39c3b2ca3;hb=fbaa3c4855ec3ebfb62fe91437f4237582db2577;hp=5a129f88da44a8c63515ab820574582db1be94e8;hpb=4948c3b26fea84eb0e8ecaf06351250888e88c4f;p=rust-lightning diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 5a129f88..8733def1 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -16,17 +16,17 @@ use bitcoin::{BlockHash, TxOut}; use bitcoin::hashes::hex::ToHex; +use crate::events::MessageSendEvent; use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::msgs::{self, LightningError, ErrorAction}; use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; -use crate::util::events::MessageSendEvent; use crate::util::logger::{Level, Logger}; use crate::util::ser::Writeable; use crate::prelude::*; use alloc::sync::{Arc, Weak}; -use crate::sync::Mutex; +use crate::sync::{Mutex, LockTestExt}; use core::ops::Deref; /// An error when accessing the chain via [`UtxoLookup`]. @@ -125,7 +125,7 @@ pub struct UtxoFuture { /// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph /// once we have a concrete resolution of a request. -struct UtxoResolver(Result); +pub(crate) struct UtxoResolver(Result); impl UtxoLookup for UtxoResolver { fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { UtxoResult::Sync(self.0.clone()) @@ -368,11 +368,6 @@ impl PendingChecks { if latest_announce.is_none() || latest_announce.as_ref().unwrap().timestamp() < msg.timestamp { - // If the messages we got has a higher timestamp, just blindly - // assume the signatures on the new message are correct and drop - // the old message. This may cause us to end up dropping valid - // `node_announcement`s if a peer is malicious, but we should get - // the correct ones when the node updates them. *latest_announce = Some( if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) } else { NodeAnnouncement::Unsigned(msg.clone()) }); @@ -409,7 +404,10 @@ impl PendingChecks { // lookup if we haven't gotten that far yet). match Weak::upgrade(&e.get()) { Some(pending_msgs) => { - let pending_matches = match &pending_msgs.lock().unwrap().channel_announce { + // This may be called with the mutex held on a different UtxoMessages + // struct, however in that case we have a global lockorder of new messages + // -> old messages, which makes this safe. + let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce { Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg, Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg, None => { @@ -543,7 +541,7 @@ impl PendingChecks { let mut pending_checks = self.internal.lock().unwrap(); if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS { // If we have many channel checks pending, ensure we don't have any dangling checks - // (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture` + // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture` // instead) before we commit to applying backpressure. pending_checks.channels.retain(|_, chan| { Weak::upgrade(&chan).is_some() @@ -558,3 +556,308 @@ impl PendingChecks { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::routing::gossip::tests::*; + use crate::util::test_utils::{TestChainSource, TestLogger}; + use crate::ln::msgs; + + use bitcoin::secp256k1::{Secp256k1, SecretKey}; + + use core::sync::atomic::Ordering; + + fn get_network() -> (TestChainSource, NetworkGraph>) { + let logger = Box::new(TestLogger::new()); + let chain_source = TestChainSource::new(bitcoin::Network::Testnet); + let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger); + + (chain_source, network_graph) + } + + fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource, + NetworkGraph>, bitcoin::Script, msgs::NodeAnnouncement, + msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate) + { + let secp_ctx = Secp256k1::new(); + + let (chain_source, network_graph) = get_network(); + + let good_script = get_channel_script(&secp_ctx); + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + + let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx); + let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx); + + // Note that we have to set the "direction" flag correctly on both messages + let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx); + let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx); + let chan_update_c = get_signed_channel_update(|msg| { + msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx); + + (valid_announcement, chain_source, network_graph, good_script, node_a_announce, + node_b_announce, chan_update_a, chan_update_b, chan_update_c) + } + + #[test] + fn test_fast_async_lookup() { + // Check that async lookups which resolve quicker than the future is returned to the + // `get_utxo` call can read it still resolve properly. + let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap(); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some()); + } + + #[test] + fn test_async_lookup() { + // Test a simple async lookup + let (valid_announcement, chain_source, network_graph, good_script, + node_a_announce, node_b_announce, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 0, script_pubkey: good_script })); + network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); + network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_none()); + + network_graph.update_node_from_announcement(&node_a_announce).unwrap(); + network_graph.update_node_from_announcement(&node_b_announce).unwrap(); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_some()); + } + + #[test] + fn test_invalid_async_lookup() { + // Test an async lookup which returns an incorrect script + let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::Script::new() })); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + } + + #[test] + fn test_failing_async_lookup() { + // Test an async lookup which returns an error + let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + } + + #[test] + fn test_updates_async_lookup() { + // Test async lookups will process pending channel_update/node_announcements once they + // complete. + let (valid_announcement, chain_source, network_graph, good_script, node_a_announce, + node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + assert_eq!( + network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err, + "Awaiting channel_announcement validation to accept node_announcement"); + assert_eq!( + network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err, + "Awaiting channel_announcement validation to accept node_announcement"); + + assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some()); + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some()); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_some()); + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2) + .unwrap().announcement_info.is_some()); + } + + #[test] + fn test_latest_update_async_lookup() { + // Test async lookups will process the latest channel_update if two are received while + // awaiting an async UTXO lookup. + let (valid_announcement, chain_source, network_graph, good_script, _, + _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + + assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp); + let graph_lock = network_graph.read_only(); + assert!(graph_lock.channels() + .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() + .one_to_two.as_ref().unwrap().last_update != + graph_lock.channels() + .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() + .two_to_one.as_ref().unwrap().last_update); + } + + #[test] + fn test_no_double_lookups() { + // Test that a pending async lookup will prevent a second async lookup from flying, but + // only if the channel_announcement message is identical. + let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); + + // If we make a second request with the same message, the call count doesn't increase... + let future_b = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone()); + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel announcement is already being checked"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); + + // But if we make a third request with a tweaked message, we should get a second call + // against our new future... + let secp_ctx = Secp256k1::new(); + let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap(); + let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap(); + let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx); + assert_eq!( + network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2); + + // Still, if we resolve the original future, the original channel will be accepted. + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + assert!(!network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap() + .announcement_message.as_ref().unwrap() + .contents.features.supports_unknown_test_feature()); + } + + #[test] + fn test_checks_backpressure() { + // Test that too_many_checks_pending returns true when there are many checks pending, and + // returns false once they complete. + let secp_ctx = Secp256k1::new(); + let (chain_source, network_graph) = get_network(); + + // We cheat and use a single future for all the lookups to complete them all at once. + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + + for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { + let valid_announcement = get_signed_channel_announcement( + |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + let valid_announcement = get_signed_channel_announcement( + |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(network_graph.pending_checks.too_many_checks_pending()); + + // Once the future completes the "too many checks" flag should reset. + future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + #[test] + fn test_checks_backpressure_drop() { + // Test that too_many_checks_pending returns true when there are many checks pending, and + // returns false if we drop some of the futures without completion. + let secp_ctx = Secp256k1::new(); + let (chain_source, network_graph) = get_network(); + + // We cheat and use a single future for all the lookups to complete them all at once. + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new()); + + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + + for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { + let valid_announcement = get_signed_channel_announcement( + |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + let valid_announcement = get_signed_channel_announcement( + |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(network_graph.pending_checks.too_many_checks_pending()); + + // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag + // should reset to false. + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } +}