if latest_announce.is_none() ||
latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
{
- // If the messages we got has a higher timestamp, just blindly
- // assume the signatures on the new message are correct and drop
- // the old message. This may cause us to end up dropping valid
- // `node_announcement`s if a peer is malicious, but we should get
- // the correct ones when the node updates them.
*latest_announce = Some(
if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
else { NodeAnnouncement::Unsigned(msg.clone()) });
let mut pending_checks = self.internal.lock().unwrap();
if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
// If we have many channel checks pending, ensure we don't have any dangling checks
- // (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture`
+ // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture`
// instead) before we commit to applying backpressure.
pending_checks.channels.retain(|_, chan| {
Weak::upgrade(&chan).is_some()
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::routing::gossip::tests::*;
+ use crate::util::test_utils::{TestChainSource, TestLogger};
+ use crate::ln::msgs;
+
+ use bitcoin::blockdata::constants::genesis_block;
+ use bitcoin::secp256k1::{Secp256k1, SecretKey};
+
+ use core::sync::atomic::Ordering;
+
+ fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
+ let logger = Box::new(TestLogger::new());
+ let genesis_hash = genesis_block(bitcoin::Network::Testnet).header.block_hash();
+ let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
+ let network_graph = NetworkGraph::new(genesis_hash, logger);
+
+ (chain_source, network_graph)
+ }
+
+ fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
+ NetworkGraph<Box<TestLogger>>, bitcoin::Script, msgs::NodeAnnouncement,
+ msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
+ {
+ let secp_ctx = Secp256k1::new();
+
+ let (chain_source, network_graph) = get_network();
+
+ let good_script = get_channel_script(&secp_ctx);
+ let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
+ let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+ let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
+
+ let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
+ let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
+
+ // Note that we have to set the "direction" flag correctly on both messages
+ let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx);
+ let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx);
+ let chan_update_c = get_signed_channel_update(|msg| {
+ msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
+
+ (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
+ node_b_announce, chan_update_a, chan_update_b, chan_update_c)
+ }
+
+ #[test]
+ fn test_fast_async_lookup() {
+ // Check that async lookups which resolve quicker than the future is returned to the
+ // `get_utxo` call can read it still resolve properly.
+ let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
+ }
+
+ #[test]
+ fn test_async_lookup() {
+ // Test a simple async lookup
+ let (valid_announcement, chain_source, network_graph, good_script,
+ node_a_announce, node_b_announce, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 0, script_pubkey: good_script }));
+ network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
+ network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
+
+ assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
+ .unwrap().announcement_info.is_none());
+
+ network_graph.update_node_from_announcement(&node_a_announce).unwrap();
+ network_graph.update_node_from_announcement(&node_b_announce).unwrap();
+
+ assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
+ .unwrap().announcement_info.is_some());
+ }
+
+ #[test]
+ fn test_invalid_async_lookup() {
+ // Test an async lookup which returns an incorrect script
+ let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::Script::new() }));
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+ }
+
+ #[test]
+ fn test_failing_async_lookup() {
+ // Test an async lookup which returns an error
+ let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+ }
+
+ #[test]
+ fn test_updates_async_lookup() {
+ // Test async lookups will process pending channel_update/node_announcements once they
+ // complete.
+ let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
+ node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ assert_eq!(
+ network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept node_announcement");
+ assert_eq!(
+ network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept node_announcement");
+
+ assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+ assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+
+ assert!(network_graph.read_only().channels()
+ .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
+ assert!(network_graph.read_only().channels()
+ .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
+
+ assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
+ .unwrap().announcement_info.is_some());
+ assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
+ .unwrap().announcement_info.is_some());
+ }
+
+ #[test]
+ fn test_latest_update_async_lookup() {
+ // Test async lookups will process the latest channel_update if two are received while
+ // awaiting an async UTXO lookup.
+ let (valid_announcement, chain_source, network_graph, good_script, _,
+ _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+ assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+ assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+
+ assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
+ assert!(network_graph.read_only().channels()
+ .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
+ .one_to_two.as_ref().unwrap().last_update !=
+ network_graph.read_only().channels()
+ .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
+ .two_to_one.as_ref().unwrap().last_update);
+ }
+
+ #[test]
+ fn test_no_double_lookups() {
+ // Test that a pending async lookup will prevent a second async lookup from flying, but
+ // only if the channel_announcement message is identical.
+ let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
+
+ // If we make a second request with the same message, the call count doesn't increase...
+ let future_b = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel announcement is already being checked");
+ assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
+
+ // But if we make a third request with a tweaked message, we should get a second call
+ // against our new future...
+ let secp_ctx = Secp256k1::new();
+ let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
+ let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
+ let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
+
+ // Still, if we resolve the original future, the original channel will be accepted.
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+ assert!(!network_graph.read_only().channels()
+ .get(&valid_announcement.contents.short_channel_id).unwrap()
+ .announcement_message.as_ref().unwrap()
+ .contents.features.supports_unknown_test_feature());
+ }
+
+ #[test]
+ fn test_checks_backpressure() {
+ // Test that too_many_checks_pending returns true when there are many checks pending, and
+ // returns false once they complete.
+ let secp_ctx = Secp256k1::new();
+ let (chain_source, network_graph) = get_network();
+
+ // We cheat and use a single future for all the lookups to complete them all at once.
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
+ let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+
+ for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
+ let valid_announcement = get_signed_channel_announcement(
+ |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+ assert!(!network_graph.pending_checks.too_many_checks_pending());
+ }
+
+ let valid_announcement = get_signed_channel_announcement(
+ |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+ assert!(network_graph.pending_checks.too_many_checks_pending());
+
+ // Once the future completes the "too many checks" flag should reset.
+ future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
+ assert!(!network_graph.pending_checks.too_many_checks_pending());
+ }
+
+ #[test]
+ fn test_checks_backpressure_drop() {
+ // Test that too_many_checks_pending returns true when there are many checks pending, and
+ // returns false if we drop some of the futures without completion.
+ let secp_ctx = Secp256k1::new();
+ let (chain_source, network_graph) = get_network();
+
+ // We cheat and use a single future for all the lookups to complete them all at once.
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
+
+ let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
+ let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+
+ for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
+ let valid_announcement = get_signed_channel_announcement(
+ |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+ assert!(!network_graph.pending_checks.too_many_checks_pending());
+ }
+
+ let valid_announcement = get_signed_channel_announcement(
+ |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+ assert!(network_graph.pending_checks.too_many_checks_pending());
+
+ // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
+ // should reset to false.
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
+ assert!(!network_graph.pending_checks.too_many_checks_pending());
+ }
+}