1 // This file is Copyright its original authors, visible in version control
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
10 //! This module contains traits for LDK to access UTXOs to check gossip data is correct.
12 //! When lightning nodes gossip channel information, they resist DoS attacks by checking that each
13 //! channel matches a UTXO on-chain, requiring at least some marginal on-chain transacting in
14 //! order to announce a channel. This module handles that checking.
17 use bitcoin::blockdata::constants::ChainHash;
21 use crate::events::MessageSendEvent;
22 use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
23 use crate::ln::msgs::{self, LightningError, ErrorAction};
24 use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
25 use crate::util::logger::{Level, Logger};
27 use crate::prelude::*;
29 use alloc::sync::{Arc, Weak};
30 use crate::sync::{Mutex, LockTestExt};
33 /// An error when accessing the chain via [`UtxoLookup`].
34 #[derive(Clone, Debug)]
35 pub enum UtxoLookupError {
36 /// The requested chain is unknown.
39 /// The requested transaction doesn't exist or hasn't confirmed.
43 /// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
44 /// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
48 /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
49 /// requested or a [`UtxoLookupError`].
50 Sync(Result<TxOut, UtxoLookupError>),
51 /// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of
52 /// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes.
54 /// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
55 /// but only fairly loosely. Because a pending checks block all message processing, leaving
56 /// checks pending for an extended time may cause DoS of other functions. It is recommended you
57 /// keep a tight timeout on lookups, on the order of a few seconds.
61 /// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs.
62 pub trait UtxoLookup {
63 /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
64 /// Returns an error if `chain_hash` is for a different chain or if such a transaction output is
67 /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
68 fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult;
71 enum ChannelAnnouncement {
72 Full(msgs::ChannelAnnouncement),
73 Unsigned(msgs::UnsignedChannelAnnouncement),
75 impl ChannelAnnouncement {
76 fn node_id_1(&self) -> &NodeId {
78 ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
79 ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
84 enum NodeAnnouncement {
85 Full(msgs::NodeAnnouncement),
86 Unsigned(msgs::UnsignedNodeAnnouncement),
88 impl NodeAnnouncement {
89 fn timestamp(&self) -> u32 {
91 NodeAnnouncement::Full(msg) => msg.contents.timestamp,
92 NodeAnnouncement::Unsigned(msg) => msg.timestamp,
98 Full(msgs::ChannelUpdate),
99 Unsigned(msgs::UnsignedChannelUpdate),
102 fn timestamp(&self) -> u32 {
104 ChannelUpdate::Full(msg) => msg.contents.timestamp,
105 ChannelUpdate::Unsigned(msg) => msg.timestamp,
110 struct UtxoMessages {
111 complete: Option<Result<TxOut, UtxoLookupError>>,
112 channel_announce: Option<ChannelAnnouncement>,
113 latest_node_announce_a: Option<NodeAnnouncement>,
114 latest_node_announce_b: Option<NodeAnnouncement>,
115 latest_channel_update_a: Option<ChannelUpdate>,
116 latest_channel_update_b: Option<ChannelUpdate>,
119 /// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
121 /// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info.
123 pub struct UtxoFuture {
124 state: Arc<Mutex<UtxoMessages>>,
127 /// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph
128 /// once we have a concrete resolution of a request.
129 pub(crate) struct UtxoResolver(Result<TxOut, UtxoLookupError>);
130 impl UtxoLookup for UtxoResolver {
131 fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult {
132 UtxoResult::Sync(self.0.clone())
137 /// Builds a new future for later resolution.
138 pub fn new() -> Self {
139 Self { state: Arc::new(Mutex::new(UtxoMessages {
141 channel_announce: None,
142 latest_node_announce_a: None,
143 latest_node_announce_b: None,
144 latest_channel_update_a: None,
145 latest_channel_update_b: None,
149 /// Resolves this future against the given `graph` and with the given `result`.
151 /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
152 /// forwarding the validated gossip message onwards to peers.
154 /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
155 /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
158 /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
159 /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
160 pub fn resolve_without_forwarding<L: Deref>(&self,
161 graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
162 where L::Target: Logger {
163 self.do_resolve(graph, result);
166 /// Resolves this future against the given `graph` and with the given `result`.
168 /// The given `gossip` is used to broadcast any validated messages onwards to all peers which
169 /// have available buffer space.
171 /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
172 /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
175 /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
176 /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
177 pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
178 graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
179 ) where L::Target: Logger, U::Target: UtxoLookup {
180 let mut res = self.do_resolve(graph, result);
181 for msg_opt in res.iter_mut() {
182 if let Some(msg) = msg_opt.take() {
183 gossip.forward_gossip_msg(msg);
188 fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
189 -> [Option<MessageSendEvent>; 5] where L::Target: Logger {
190 let (announcement, node_a, node_b, update_a, update_b) = {
191 let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
192 let mut async_messages = self.state.lock().unwrap();
194 if async_messages.channel_announce.is_none() {
195 // We raced returning to `check_channel_announcement` which hasn't updated
196 // `channel_announce` yet. That's okay, we can set the `complete` field which it will
197 // check once it gets control again.
198 async_messages.complete = Some(result);
199 return [None, None, None, None, None];
202 let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
203 ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
204 ChannelAnnouncement::Unsigned(msg) => &msg,
207 pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
209 (async_messages.channel_announce.take().unwrap(),
210 async_messages.latest_node_announce_a.take(),
211 async_messages.latest_node_announce_b.take(),
212 async_messages.latest_channel_update_a.take(),
213 async_messages.latest_channel_update_b.take())
216 let mut res = [None, None, None, None, None];
219 // Now that we've updated our internal state, pass the pending messages back through the
220 // network graph with a different `UtxoLookup` which will resolve immediately.
221 // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
223 let resolver = UtxoResolver(result);
225 ChannelAnnouncement::Full(signed_msg) => {
226 if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
227 res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
228 msg: signed_msg, update_msg: None,
233 ChannelAnnouncement::Unsigned(msg) => {
234 let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
238 for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
240 Some(NodeAnnouncement::Full(signed_msg)) => {
241 if graph.update_node_from_announcement(&signed_msg).is_ok() {
242 res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
248 Some(NodeAnnouncement::Unsigned(msg)) => {
249 let _ = graph.update_node_from_unsigned_announcement(&msg);
255 for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
257 Some(ChannelUpdate::Full(signed_msg)) => {
258 if graph.update_channel(&signed_msg).is_ok() {
259 res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
265 Some(ChannelUpdate::Unsigned(msg)) => {
266 let _ = graph.update_channel_unsigned(&msg);
276 struct PendingChecksContext {
277 channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
278 nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
281 impl PendingChecksContext {
282 fn lookup_completed(&mut self,
283 msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
285 if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
286 if Weak::ptr_eq(e.get(), &completed_state) {
291 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
292 e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
293 if e.get().is_empty() { e.remove(); }
295 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
296 e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
297 if e.get().is_empty() { e.remove(); }
302 /// A set of messages which are pending UTXO lookups for processing.
303 pub(super) struct PendingChecks {
304 internal: Mutex<PendingChecksContext>,
308 pub(super) fn new() -> Self {
309 PendingChecks { internal: Mutex::new(PendingChecksContext {
310 channels: new_hash_map(), nodes: new_hash_map(),
314 /// Checks if there is a pending `channel_update` UTXO validation for the given channel,
315 /// and, if so, stores the channel message for handling later and returns an `Err`.
316 pub(super) fn check_hold_pending_channel_update(
317 &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
318 ) -> Result<(), LightningError> {
319 let mut pending_checks = self.internal.lock().unwrap();
320 if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
321 let is_from_a = (msg.flags & 1) == 1;
322 match Weak::upgrade(e.get()) {
324 let mut messages = msgs_ref.lock().unwrap();
325 let latest_update = if is_from_a {
326 &mut messages.latest_channel_update_a
328 &mut messages.latest_channel_update_b
330 if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
331 // If the messages we got has a higher timestamp, just blindly assume the
332 // signatures on the new message are correct and drop the old message. This
333 // may cause us to end up dropping valid `channel_update`s if a peer is
334 // malicious, but we should get the correct ones when the node updates them.
335 *latest_update = Some(
336 if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
337 else { ChannelUpdate::Unsigned(msg.clone()) });
339 return Err(LightningError {
340 err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
341 action: ErrorAction::IgnoreAndLog(Level::Gossip),
344 None => { e.remove(); },
350 /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
351 /// given node and, if so, stores the channel message for handling later and returns an `Err`.
352 pub(super) fn check_hold_pending_node_announcement(
353 &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
354 ) -> Result<(), LightningError> {
355 let mut pending_checks = self.internal.lock().unwrap();
356 if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
357 let mut found_at_least_one_chan = false;
358 e.get_mut().retain(|node_msgs| {
359 match Weak::upgrade(&node_msgs) {
361 let mut chan_msgs = chan_mtx.lock().unwrap();
362 if let Some(chan_announce) = &chan_msgs.channel_announce {
363 let latest_announce =
364 if *chan_announce.node_id_1() == msg.node_id {
365 &mut chan_msgs.latest_node_announce_a
367 &mut chan_msgs.latest_node_announce_b
369 if latest_announce.is_none() ||
370 latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
372 *latest_announce = Some(
373 if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
374 else { NodeAnnouncement::Unsigned(msg.clone()) });
376 found_at_least_one_chan = true;
379 debug_assert!(false, "channel_announce is set before struct is added to node map");
386 if e.get().is_empty() { e.remove(); }
387 if found_at_least_one_chan {
388 return Err(LightningError {
389 err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
390 action: ErrorAction::IgnoreAndLog(Level::Gossip),
397 fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
398 full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
399 pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
400 ) -> Result<(), msgs::LightningError> {
401 match pending_channels.entry(msg.short_channel_id) {
402 hash_map::Entry::Occupied(mut e) => {
403 // There's already a pending lookup for the given SCID. Check if the messages
404 // are the same and, if so, return immediately (don't bother spawning another
405 // lookup if we haven't gotten that far yet).
406 match Weak::upgrade(&e.get()) {
407 Some(pending_msgs) => {
408 // This may be called with the mutex held on a different UtxoMessages
409 // struct, however in that case we have a global lockorder of new messages
410 // -> old messages, which makes this safe.
411 let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce {
412 Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
413 Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
415 // This shouldn't actually be reachable. We set the
416 // `channel_announce` field under the same lock as setting the
417 // channel map entry. Still, we can just treat it as
418 // non-matching and let the new request fly.
419 debug_assert!(false);
424 return Err(LightningError {
425 err: "Channel announcement is already being checked".to_owned(),
426 action: ErrorAction::IgnoreDuplicateGossip,
429 // The earlier lookup is a different message. If we have another
430 // request in-flight now replace the original.
431 // Note that in the replace case whether to replace is somewhat
432 // arbitrary - both results will be handled, we're just updating the
433 // value that will be compared to future lookups with the same SCID.
434 if let Some(item) = replacement {
440 // The earlier lookup already resolved. We can't be sure its the same
441 // so just remove/replace it and move on.
442 if let Some(item) = replacement {
444 } else { e.remove(); }
448 hash_map::Entry::Vacant(v) => {
449 if let Some(item) = replacement { v.insert(item); }
455 pub(super) fn check_channel_announcement<U: Deref>(&self,
456 utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
457 full_msg: Option<&msgs::ChannelAnnouncement>
458 ) -> Result<Option<u64>, msgs::LightningError> where U::Target: UtxoLookup {
459 let handle_result = |res| {
461 Ok(TxOut { value, script_pubkey }) => {
462 let expected_script =
463 make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_array(), msg.bitcoin_key_2.as_array()).to_v0_p2wsh();
464 if script_pubkey != expected_script {
465 return Err(LightningError{
466 err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
467 expected_script.to_hex_string(), script_pubkey.to_hex_string()),
468 action: ErrorAction::IgnoreError
473 Err(UtxoLookupError::UnknownChain) => {
475 err: format!("Channel announced on an unknown chain ({})",
476 msg.chain_hash.to_bytes().as_hex()),
477 action: ErrorAction::IgnoreError
480 Err(UtxoLookupError::UnknownTx) => {
482 err: "Channel announced without corresponding UTXO entry".to_owned(),
483 action: ErrorAction::IgnoreError
489 Self::check_replace_previous_entry(msg, full_msg, None,
490 &mut self.internal.lock().unwrap().channels)?;
494 // Tentatively accept, potentially exposing us to DoS attacks
497 &Some(ref utxo_lookup) => {
498 match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
499 UtxoResult::Sync(res) => handle_result(res),
500 UtxoResult::Async(future) => {
501 let mut pending_checks = self.internal.lock().unwrap();
502 let mut async_messages = future.state.lock().unwrap();
503 if let Some(res) = async_messages.complete.take() {
504 // In the unlikely event the future resolved before we managed to get it,
505 // handle the result in-line.
508 Self::check_replace_previous_entry(msg, full_msg,
509 Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
510 async_messages.channel_announce = Some(
511 if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
512 else { ChannelAnnouncement::Unsigned(msg.clone()) });
513 pending_checks.nodes.entry(msg.node_id_1)
514 .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
515 pending_checks.nodes.entry(msg.node_id_2)
516 .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
518 err: "Channel being checked async".to_owned(),
519 action: ErrorAction::IgnoreAndLog(Level::Gossip),
528 /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
529 /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
530 /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
531 /// which we'll have to process. With a socket buffer of 4KB and a minimum
532 /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
533 /// count` messages to process beyond this limit. Because we'll probably have a few peers,
534 /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
535 /// checks should be more than enough for decent parallelism.
536 const MAX_PENDING_LOOKUPS: usize = 32;
538 /// Returns true if there are a large number of async checks pending and future
539 /// `channel_announcement` messages should be delayed. Note that this is only a hint and
540 /// messages already in-flight may still have to be handled for various reasons.
541 pub(super) fn too_many_checks_pending(&self) -> bool {
542 let mut pending_checks = self.internal.lock().unwrap();
543 if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
544 // If we have many channel checks pending, ensure we don't have any dangling checks
545 // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture`
546 // instead) before we commit to applying backpressure.
547 pending_checks.channels.retain(|_, chan| {
548 Weak::upgrade(&chan).is_some()
550 pending_checks.nodes.retain(|_, channels| {
551 channels.retain(|chan| Weak::upgrade(&chan).is_some());
554 pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
564 use crate::routing::gossip::tests::*;
565 use crate::util::test_utils::{TestChainSource, TestLogger};
567 use bitcoin::secp256k1::{Secp256k1, SecretKey};
569 use core::sync::atomic::Ordering;
571 fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
572 let logger = Box::new(TestLogger::new());
573 let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
574 let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger);
576 (chain_source, network_graph)
579 fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
580 NetworkGraph<Box<TestLogger>>, bitcoin::ScriptBuf, msgs::NodeAnnouncement,
581 msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
583 let secp_ctx = Secp256k1::new();
585 let (chain_source, network_graph) = get_network();
587 let good_script = get_channel_script(&secp_ctx);
588 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
589 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
590 let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
592 let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
593 let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
595 // Note that we have to set the "direction" flag correctly on both messages
596 let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx);
597 let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx);
598 let chan_update_c = get_signed_channel_update(|msg| {
599 msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
601 (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
602 node_b_announce, chan_update_a, chan_update_b, chan_update_c)
606 fn test_fast_async_lookup() {
607 // Check that async lookups which resolve quicker than the future is returned to the
608 // `get_utxo` call can read it still resolve properly.
609 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
611 let future = UtxoFuture::new();
612 future.resolve_without_forwarding(&network_graph,
613 Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
614 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
616 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
617 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
621 fn test_async_lookup() {
622 // Test a simple async lookup
623 let (valid_announcement, chain_source, network_graph, good_script,
624 node_a_announce, node_b_announce, ..) = get_test_objects();
626 let future = UtxoFuture::new();
627 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
630 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
631 "Channel being checked async");
632 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
634 future.resolve_without_forwarding(&network_graph,
635 Ok(TxOut { value: 0, script_pubkey: good_script }));
636 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
637 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
639 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
640 .unwrap().announcement_info.is_none());
642 network_graph.update_node_from_announcement(&node_a_announce).unwrap();
643 network_graph.update_node_from_announcement(&node_b_announce).unwrap();
645 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
646 .unwrap().announcement_info.is_some());
650 fn test_invalid_async_lookup() {
651 // Test an async lookup which returns an incorrect script
652 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
654 let future = UtxoFuture::new();
655 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
658 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
659 "Channel being checked async");
660 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
662 future.resolve_without_forwarding(&network_graph,
663 Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::ScriptBuf::new() }));
664 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
668 fn test_failing_async_lookup() {
669 // Test an async lookup which returns an error
670 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
672 let future = UtxoFuture::new();
673 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
676 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
677 "Channel being checked async");
678 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
680 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
681 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
685 fn test_updates_async_lookup() {
686 // Test async lookups will process pending channel_update/node_announcements once they
688 let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
689 node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
691 let future = UtxoFuture::new();
692 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
695 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
696 "Channel being checked async");
697 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
700 network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
701 "Awaiting channel_announcement validation to accept node_announcement");
703 network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
704 "Awaiting channel_announcement validation to accept node_announcement");
706 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
707 "Awaiting channel_announcement validation to accept channel_update");
708 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
709 "Awaiting channel_announcement validation to accept channel_update");
711 future.resolve_without_forwarding(&network_graph,
712 Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
714 assert!(network_graph.read_only().channels()
715 .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
716 assert!(network_graph.read_only().channels()
717 .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
719 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
720 .unwrap().announcement_info.is_some());
721 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
722 .unwrap().announcement_info.is_some());
726 fn test_latest_update_async_lookup() {
727 // Test async lookups will process the latest channel_update if two are received while
728 // awaiting an async UTXO lookup.
729 let (valid_announcement, chain_source, network_graph, good_script, _,
730 _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
732 let future = UtxoFuture::new();
733 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
736 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
737 "Channel being checked async");
738 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
740 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
741 "Awaiting channel_announcement validation to accept channel_update");
742 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
743 "Awaiting channel_announcement validation to accept channel_update");
744 assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
745 "Awaiting channel_announcement validation to accept channel_update");
747 future.resolve_without_forwarding(&network_graph,
748 Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
750 assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
751 let graph_lock = network_graph.read_only();
752 assert!(graph_lock.channels()
753 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
754 .one_to_two.as_ref().unwrap().last_update !=
755 graph_lock.channels()
756 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
757 .two_to_one.as_ref().unwrap().last_update);
761 fn test_no_double_lookups() {
762 // Test that a pending async lookup will prevent a second async lookup from flying, but
763 // only if the channel_announcement message is identical.
764 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
766 let future = UtxoFuture::new();
767 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
770 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
771 "Channel being checked async");
772 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
774 // If we make a second request with the same message, the call count doesn't increase...
775 let future_b = UtxoFuture::new();
776 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
778 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
779 "Channel announcement is already being checked");
780 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
782 // But if we make a third request with a tweaked message, we should get a second call
783 // against our new future...
784 let secp_ctx = Secp256k1::new();
785 let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
786 let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
787 let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
789 network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
790 "Channel being checked async");
791 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
793 // Still, if we resolve the original future, the original channel will be accepted.
794 future.resolve_without_forwarding(&network_graph,
795 Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
796 assert!(!network_graph.read_only().channels()
797 .get(&valid_announcement.contents.short_channel_id).unwrap()
798 .announcement_message.as_ref().unwrap()
799 .contents.features.supports_unknown_test_feature());
803 fn test_checks_backpressure() {
804 // Test that too_many_checks_pending returns true when there are many checks pending, and
805 // returns false once they complete.
806 let secp_ctx = Secp256k1::new();
807 let (chain_source, network_graph) = get_network();
809 // We cheat and use a single future for all the lookups to complete them all at once.
810 let future = UtxoFuture::new();
811 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
813 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
814 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
816 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
817 let valid_announcement = get_signed_channel_announcement(
818 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
819 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
820 assert!(!network_graph.pending_checks.too_many_checks_pending());
823 let valid_announcement = get_signed_channel_announcement(
824 |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
825 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
826 assert!(network_graph.pending_checks.too_many_checks_pending());
828 // Once the future completes the "too many checks" flag should reset.
829 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
830 assert!(!network_graph.pending_checks.too_many_checks_pending());
834 fn test_checks_backpressure_drop() {
835 // Test that too_many_checks_pending returns true when there are many checks pending, and
836 // returns false if we drop some of the futures without completion.
837 let secp_ctx = Secp256k1::new();
838 let (chain_source, network_graph) = get_network();
840 // We cheat and use a single future for all the lookups to complete them all at once.
841 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
843 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
844 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
846 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
847 let valid_announcement = get_signed_channel_announcement(
848 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
849 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
850 assert!(!network_graph.pending_checks.too_many_checks_pending());
853 let valid_announcement = get_signed_channel_announcement(
854 |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
855 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
856 assert!(network_graph.pending_checks.too_many_checks_pending());
858 // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
859 // should reset to false.
860 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
861 assert!(!network_graph.pending_checks.too_many_checks_pending());