use bitcoin::{BlockHash, TxOut};
use bitcoin::hashes::hex::ToHex;
+use crate::events::MessageSendEvent;
use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
use crate::ln::msgs::{self, LightningError, ErrorAction};
-use crate::routing::gossip::{NetworkGraph, NodeId};
+use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
use crate::util::logger::{Level, Logger};
use crate::util::ser::Writeable;
use crate::prelude::*;
use alloc::sync::{Arc, Weak};
-use crate::sync::Mutex;
+use crate::sync::{Mutex, LockTestExt};
use core::ops::Deref;
/// An error when accessing the chain via [`UtxoLookup`].
/// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
/// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
/// variant.
+#[derive(Clone)]
pub enum UtxoResult {
/// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
/// requested or a [`UtxoLookupError`].
Full(msgs::ChannelAnnouncement),
Unsigned(msgs::UnsignedChannelAnnouncement),
}
+impl ChannelAnnouncement {
+ fn node_id_1(&self) -> &NodeId {
+ match self {
+ ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
+ ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
+ }
+ }
+}
+
+enum NodeAnnouncement {
+ Full(msgs::NodeAnnouncement),
+ Unsigned(msgs::UnsignedNodeAnnouncement),
+}
+impl NodeAnnouncement {
+ fn timestamp(&self) -> u32 {
+ match self {
+ NodeAnnouncement::Full(msg) => msg.contents.timestamp,
+ NodeAnnouncement::Unsigned(msg) => msg.timestamp,
+ }
+ }
+}
+
+enum ChannelUpdate {
+ Full(msgs::ChannelUpdate),
+ Unsigned(msgs::UnsignedChannelUpdate),
+}
+impl ChannelUpdate {
+ fn timestamp(&self) -> u32 {
+ match self {
+ ChannelUpdate::Full(msg) => msg.contents.timestamp,
+ ChannelUpdate::Unsigned(msg) => msg.timestamp,
+ }
+ }
+}
struct UtxoMessages {
complete: Option<Result<TxOut, UtxoLookupError>>,
channel_announce: Option<ChannelAnnouncement>,
+ latest_node_announce_a: Option<NodeAnnouncement>,
+ latest_node_announce_b: Option<NodeAnnouncement>,
+ latest_channel_update_a: Option<ChannelUpdate>,
+ latest_channel_update_b: Option<ChannelUpdate>,
}
/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
Self { state: Arc::new(Mutex::new(UtxoMessages {
complete: None,
channel_announce: None,
+ latest_node_announce_a: None,
+ latest_node_announce_b: None,
+ latest_channel_update_a: None,
+ latest_channel_update_b: None,
}))}
}
/// Resolves this future against the given `graph` and with the given `result`.
- pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
+ ///
+ /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
+ /// forwarding the validated gossip message onwards to peers.
+ ///
+ /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
+ /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
+ /// after this.
+ ///
+ /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
+ /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
+ pub fn resolve_without_forwarding<L: Deref>(&self,
+ graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
where L::Target: Logger {
- let announcement = {
+ self.do_resolve(graph, result);
+ }
+
+ /// Resolves this future against the given `graph` and with the given `result`.
+ ///
+ /// The given `gossip` is used to broadcast any validated messages onwards to all peers which
+ /// have available buffer space.
+ ///
+ /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
+ /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
+ /// after this.
+ ///
+ /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
+ /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
+ pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
+ graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
+ ) where L::Target: Logger, U::Target: UtxoLookup {
+ let mut res = self.do_resolve(graph, result);
+ for msg_opt in res.iter_mut() {
+ if let Some(msg) = msg_opt.take() {
+ gossip.forward_gossip_msg(msg);
+ }
+ }
+ }
+
+ fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
+ -> [Option<MessageSendEvent>; 5] where L::Target: Logger {
+ let (announcement, node_a, node_b, update_a, update_b) = {
let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
let mut async_messages = self.state.lock().unwrap();
// `channel_announce` yet. That's okay, we can set the `complete` field which it will
// check once it gets control again.
async_messages.complete = Some(result);
- return;
+ return [None, None, None, None, None];
}
+
let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
ChannelAnnouncement::Unsigned(msg) => &msg,
pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
- async_messages.channel_announce.take().unwrap()
+ (async_messages.channel_announce.take().unwrap(),
+ async_messages.latest_node_announce_a.take(),
+ async_messages.latest_node_announce_b.take(),
+ async_messages.latest_channel_update_a.take(),
+ async_messages.latest_channel_update_b.take())
};
+ let mut res = [None, None, None, None, None];
+ let mut res_idx = 0;
+
// Now that we've updated our internal state, pass the pending messages back through the
// network graph with a different `UtxoLookup` which will resolve immediately.
// Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
let resolver = UtxoResolver(result);
match announcement {
ChannelAnnouncement::Full(signed_msg) => {
- let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver));
+ if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
+ res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
+ msg: signed_msg, update_msg: None,
+ });
+ res_idx += 1;
+ }
},
ChannelAnnouncement::Unsigned(msg) => {
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
},
}
+
+ for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
+ match announce {
+ Some(NodeAnnouncement::Full(signed_msg)) => {
+ if graph.update_node_from_announcement(&signed_msg).is_ok() {
+ res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
+ msg: signed_msg,
+ });
+ res_idx += 1;
+ }
+ },
+ Some(NodeAnnouncement::Unsigned(msg)) => {
+ let _ = graph.update_node_from_unsigned_announcement(&msg);
+ },
+ None => {},
+ }
+ }
+
+ for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
+ match update {
+ Some(ChannelUpdate::Full(signed_msg)) => {
+ if graph.update_channel(&signed_msg).is_ok() {
+ res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
+ msg: signed_msg,
+ });
+ res_idx += 1;
+ }
+ },
+ Some(ChannelUpdate::Unsigned(msg)) => {
+ let _ = graph.update_channel_unsigned(&msg);
+ },
+ None => {},
+ }
+ }
+
+ res
}
}
struct PendingChecksContext {
channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
+ nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
}
impl PendingChecksContext {
e.remove();
}
}
+
+ if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
+ e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
+ if e.get().is_empty() { e.remove(); }
+ }
+ if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
+ e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
+ if e.get().is_empty() { e.remove(); }
+ }
}
}
impl PendingChecks {
pub(super) fn new() -> Self {
PendingChecks { internal: Mutex::new(PendingChecksContext {
- channels: HashMap::new(),
+ channels: HashMap::new(), nodes: HashMap::new(),
}) }
}
+ /// Checks if there is a pending `channel_update` UTXO validation for the given channel,
+ /// and, if so, stores the channel message for handling later and returns an `Err`.
+ pub(super) fn check_hold_pending_channel_update(
+ &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
+ ) -> Result<(), LightningError> {
+ let mut pending_checks = self.internal.lock().unwrap();
+ if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
+ let is_from_a = (msg.flags & 1) == 1;
+ match Weak::upgrade(e.get()) {
+ Some(msgs_ref) => {
+ let mut messages = msgs_ref.lock().unwrap();
+ let latest_update = if is_from_a {
+ &mut messages.latest_channel_update_a
+ } else {
+ &mut messages.latest_channel_update_b
+ };
+ if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
+ // If the messages we got has a higher timestamp, just blindly assume the
+ // signatures on the new message are correct and drop the old message. This
+ // may cause us to end up dropping valid `channel_update`s if a peer is
+ // malicious, but we should get the correct ones when the node updates them.
+ *latest_update = Some(
+ if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
+ else { ChannelUpdate::Unsigned(msg.clone()) });
+ }
+ return Err(LightningError {
+ err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
+ action: ErrorAction::IgnoreAndLog(Level::Gossip),
+ });
+ },
+ None => { e.remove(); },
+ }
+ }
+ Ok(())
+ }
+
+ /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
+ /// given node and, if so, stores the channel message for handling later and returns an `Err`.
+ pub(super) fn check_hold_pending_node_announcement(
+ &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
+ ) -> Result<(), LightningError> {
+ let mut pending_checks = self.internal.lock().unwrap();
+ if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
+ let mut found_at_least_one_chan = false;
+ e.get_mut().retain(|node_msgs| {
+ match Weak::upgrade(&node_msgs) {
+ Some(chan_mtx) => {
+ let mut chan_msgs = chan_mtx.lock().unwrap();
+ if let Some(chan_announce) = &chan_msgs.channel_announce {
+ let latest_announce =
+ if *chan_announce.node_id_1() == msg.node_id {
+ &mut chan_msgs.latest_node_announce_a
+ } else {
+ &mut chan_msgs.latest_node_announce_b
+ };
+ if latest_announce.is_none() ||
+ latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
+ {
+ *latest_announce = Some(
+ if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
+ else { NodeAnnouncement::Unsigned(msg.clone()) });
+ }
+ found_at_least_one_chan = true;
+ true
+ } else {
+ debug_assert!(false, "channel_announce is set before struct is added to node map");
+ false
+ }
+ },
+ None => false,
+ }
+ });
+ if e.get().is_empty() { e.remove(); }
+ if found_at_least_one_chan {
+ return Err(LightningError {
+ err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
+ action: ErrorAction::IgnoreAndLog(Level::Gossip),
+ });
+ }
+ }
+ Ok(())
+ }
+
fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
// lookup if we haven't gotten that far yet).
match Weak::upgrade(&e.get()) {
Some(pending_msgs) => {
- let pending_matches = match &pending_msgs.lock().unwrap().channel_announce {
+ // This may be called with the mutex held on a different UtxoMessages
+ // struct, however in that case we have a global lockorder of new messages
+ // -> old messages, which makes this safe.
+ let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce {
Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
None => {
async_messages.channel_announce = Some(
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
else { ChannelAnnouncement::Unsigned(msg.clone()) });
+ pending_checks.nodes.entry(msg.node_id_1)
+ .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
+ pending_checks.nodes.entry(msg.node_id_2)
+ .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
Err(LightningError {
err: "Channel being checked async".to_owned(),
action: ErrorAction::IgnoreAndLog(Level::Gossip),
}
}
}
+
+ /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
+ /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
+ /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
+ /// which we'll have to process. With a socket buffer of 4KB and a minimum
+ /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
+ /// count` messages to process beyond this limit. Because we'll probably have a few peers,
+ /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
+ /// checks should be more than enough for decent parallelism.
+ const MAX_PENDING_LOOKUPS: usize = 32;
+
+ /// Returns true if there are a large number of async checks pending and future
+ /// `channel_announcement` messages should be delayed. Note that this is only a hint and
+ /// messages already in-flight may still have to be handled for various reasons.
+ pub(super) fn too_many_checks_pending(&self) -> bool {
+ let mut pending_checks = self.internal.lock().unwrap();
+ if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
+ // If we have many channel checks pending, ensure we don't have any dangling checks
+ // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture`
+ // instead) before we commit to applying backpressure.
+ pending_checks.channels.retain(|_, chan| {
+ Weak::upgrade(&chan).is_some()
+ });
+ pending_checks.nodes.retain(|_, channels| {
+ channels.retain(|chan| Weak::upgrade(&chan).is_some());
+ !channels.is_empty()
+ });
+ pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
+ } else {
+ false
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::routing::gossip::tests::*;
+ use crate::util::test_utils::{TestChainSource, TestLogger};
+ use crate::ln::msgs;
+
+ use bitcoin::secp256k1::{Secp256k1, SecretKey};
+
+ use core::sync::atomic::Ordering;
+
+ fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
+ let logger = Box::new(TestLogger::new());
+ let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
+ let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger);
+
+ (chain_source, network_graph)
+ }
+
+ fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
+ NetworkGraph<Box<TestLogger>>, bitcoin::Script, msgs::NodeAnnouncement,
+ msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
+ {
+ let secp_ctx = Secp256k1::new();
+
+ let (chain_source, network_graph) = get_network();
+
+ let good_script = get_channel_script(&secp_ctx);
+ let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
+ let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+ let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
+
+ let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
+ let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
+
+ // Note that we have to set the "direction" flag correctly on both messages
+ let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx);
+ let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx);
+ let chan_update_c = get_signed_channel_update(|msg| {
+ msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
+
+ (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
+ node_b_announce, chan_update_a, chan_update_b, chan_update_c)
+ }
+
+ #[test]
+ fn test_fast_async_lookup() {
+ // Check that async lookups which resolve quicker than the future is returned to the
+ // `get_utxo` call can read it still resolve properly.
+ let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
+ }
+
+ #[test]
+ fn test_async_lookup() {
+ // Test a simple async lookup
+ let (valid_announcement, chain_source, network_graph, good_script,
+ node_a_announce, node_b_announce, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 0, script_pubkey: good_script }));
+ network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
+ network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
+
+ assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
+ .unwrap().announcement_info.is_none());
+
+ network_graph.update_node_from_announcement(&node_a_announce).unwrap();
+ network_graph.update_node_from_announcement(&node_b_announce).unwrap();
+
+ assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
+ .unwrap().announcement_info.is_some());
+ }
+
+ #[test]
+ fn test_invalid_async_lookup() {
+ // Test an async lookup which returns an incorrect script
+ let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::Script::new() }));
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+ }
+
+ #[test]
+ fn test_failing_async_lookup() {
+ // Test an async lookup which returns an error
+ let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+ }
+
+ #[test]
+ fn test_updates_async_lookup() {
+ // Test async lookups will process pending channel_update/node_announcements once they
+ // complete.
+ let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
+ node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ assert_eq!(
+ network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept node_announcement");
+ assert_eq!(
+ network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept node_announcement");
+
+ assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+ assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+
+ assert!(network_graph.read_only().channels()
+ .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
+ assert!(network_graph.read_only().channels()
+ .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
+
+ assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
+ .unwrap().announcement_info.is_some());
+ assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
+ .unwrap().announcement_info.is_some());
+ }
+
+ #[test]
+ fn test_latest_update_async_lookup() {
+ // Test async lookups will process the latest channel_update if two are received while
+ // awaiting an async UTXO lookup.
+ let (valid_announcement, chain_source, network_graph, good_script, _,
+ _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+ assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+ assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+ assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
+ "Awaiting channel_announcement validation to accept channel_update");
+
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+
+ assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
+ let graph_lock = network_graph.read_only();
+ assert!(graph_lock.channels()
+ .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
+ .one_to_two.as_ref().unwrap().last_update !=
+ graph_lock.channels()
+ .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
+ .two_to_one.as_ref().unwrap().last_update);
+ }
+
+ #[test]
+ fn test_no_double_lookups() {
+ // Test that a pending async lookup will prevent a second async lookup from flying, but
+ // only if the channel_announcement message is identical.
+ let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
+
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
+
+ // If we make a second request with the same message, the call count doesn't increase...
+ let future_b = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel announcement is already being checked");
+ assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
+
+ // But if we make a third request with a tweaked message, we should get a second call
+ // against our new future...
+ let secp_ctx = Secp256k1::new();
+ let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
+ let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
+ let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
+ assert_eq!(
+ network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
+ "Channel being checked async");
+ assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
+
+ // Still, if we resolve the original future, the original channel will be accepted.
+ future.resolve_without_forwarding(&network_graph,
+ Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+ assert!(!network_graph.read_only().channels()
+ .get(&valid_announcement.contents.short_channel_id).unwrap()
+ .announcement_message.as_ref().unwrap()
+ .contents.features.supports_unknown_test_feature());
+ }
+
+ #[test]
+ fn test_checks_backpressure() {
+ // Test that too_many_checks_pending returns true when there are many checks pending, and
+ // returns false once they complete.
+ let secp_ctx = Secp256k1::new();
+ let (chain_source, network_graph) = get_network();
+
+ // We cheat and use a single future for all the lookups to complete them all at once.
+ let future = UtxoFuture::new();
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+ let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
+ let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+
+ for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
+ let valid_announcement = get_signed_channel_announcement(
+ |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+ assert!(!network_graph.pending_checks.too_many_checks_pending());
+ }
+
+ let valid_announcement = get_signed_channel_announcement(
+ |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+ assert!(network_graph.pending_checks.too_many_checks_pending());
+
+ // Once the future completes the "too many checks" flag should reset.
+ future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
+ assert!(!network_graph.pending_checks.too_many_checks_pending());
+ }
+
+ #[test]
+ fn test_checks_backpressure_drop() {
+ // Test that too_many_checks_pending returns true when there are many checks pending, and
+ // returns false if we drop some of the futures without completion.
+ let secp_ctx = Secp256k1::new();
+ let (chain_source, network_graph) = get_network();
+
+ // We cheat and use a single future for all the lookups to complete them all at once.
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
+
+ let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
+ let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+
+ for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
+ let valid_announcement = get_signed_channel_announcement(
+ |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+ assert!(!network_graph.pending_checks.too_many_checks_pending());
+ }
+
+ let valid_announcement = get_signed_channel_announcement(
+ |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
+ network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+ assert!(network_graph.pending_checks.too_many_checks_pending());
+
+ // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
+ // should reset to false.
+ *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
+ assert!(!network_graph.pending_checks.too_many_checks_pending());
+ }
}