Merge pull request #3081 from G8XSU/2024-05-08-claimable-persist-3049-outputs
[rust-lightning] / lightning / src / routing / utxo.rs
1 // This file is Copyright its original authors, visible in version control
2 // history.
3 //
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
8 // licenses.
9
10 //! This module contains traits for LDK to access UTXOs to check gossip data is correct.
11 //!
12 //! When lightning nodes gossip channel information, they resist DoS attacks by checking that each
13 //! channel matches a UTXO on-chain, requiring at least some marginal on-chain transacting in
14 //! order to announce a channel. This module handles that checking.
15
16 use bitcoin::TxOut;
17 use bitcoin::amount::Amount;
18 use bitcoin::blockdata::constants::ChainHash;
19
20 use hex::DisplayHex;
21
22 use crate::events::MessageSendEvent;
23 use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
24 use crate::ln::msgs::{self, LightningError, ErrorAction};
25 use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
26 use crate::util::logger::{Level, Logger};
27
28 use crate::prelude::*;
29
30 use alloc::sync::{Arc, Weak};
31 use crate::sync::{Mutex, LockTestExt};
32 use core::ops::Deref;
33
34 /// An error when accessing the chain via [`UtxoLookup`].
35 #[derive(Clone, Debug)]
36 pub enum UtxoLookupError {
37         /// The requested chain is unknown.
38         UnknownChain,
39
40         /// The requested transaction doesn't exist or hasn't confirmed.
41         UnknownTx,
42 }
43
44 /// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
45 /// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
46 /// variant.
47 #[derive(Clone)]
48 pub enum UtxoResult {
49         /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
50         /// requested or a [`UtxoLookupError`].
51         Sync(Result<TxOut, UtxoLookupError>),
52         /// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of
53         /// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes.
54         ///
55         /// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
56         /// but only fairly loosely. Because a pending checks block all message processing, leaving
57         /// checks pending for an extended time may cause DoS of other functions. It is recommended you
58         /// keep a tight timeout on lookups, on the order of a few seconds.
59         Async(UtxoFuture),
60 }
61
62 /// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs.
63 pub trait UtxoLookup {
64         /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
65         /// Returns an error if `chain_hash` is for a different chain or if such a transaction output is
66         /// unknown.
67         ///
68         /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
69         fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult;
70 }
71
72 enum ChannelAnnouncement {
73         Full(msgs::ChannelAnnouncement),
74         Unsigned(msgs::UnsignedChannelAnnouncement),
75 }
76 impl ChannelAnnouncement {
77         fn node_id_1(&self) -> &NodeId {
78                 match self {
79                         ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
80                         ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
81                 }
82         }
83 }
84
85 enum NodeAnnouncement {
86         Full(msgs::NodeAnnouncement),
87         Unsigned(msgs::UnsignedNodeAnnouncement),
88 }
89 impl NodeAnnouncement {
90         fn timestamp(&self) -> u32 {
91                 match self {
92                         NodeAnnouncement::Full(msg) => msg.contents.timestamp,
93                         NodeAnnouncement::Unsigned(msg) => msg.timestamp,
94                 }
95         }
96 }
97
98 enum ChannelUpdate {
99         Full(msgs::ChannelUpdate),
100         Unsigned(msgs::UnsignedChannelUpdate),
101 }
102 impl ChannelUpdate {
103         fn timestamp(&self) -> u32 {
104                 match self {
105                         ChannelUpdate::Full(msg) => msg.contents.timestamp,
106                         ChannelUpdate::Unsigned(msg) => msg.timestamp,
107                 }
108         }
109 }
110
111 struct UtxoMessages {
112         complete: Option<Result<TxOut, UtxoLookupError>>,
113         channel_announce: Option<ChannelAnnouncement>,
114         latest_node_announce_a: Option<NodeAnnouncement>,
115         latest_node_announce_b: Option<NodeAnnouncement>,
116         latest_channel_update_a: Option<ChannelUpdate>,
117         latest_channel_update_b: Option<ChannelUpdate>,
118 }
119
120 /// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
121 ///
122 /// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info.
123 #[derive(Clone)]
124 pub struct UtxoFuture {
125         state: Arc<Mutex<UtxoMessages>>,
126 }
127
128 /// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph
129 /// once we have a concrete resolution of a request.
130 pub(crate) struct UtxoResolver(Result<TxOut, UtxoLookupError>);
131 impl UtxoLookup for UtxoResolver {
132         fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult {
133                 UtxoResult::Sync(self.0.clone())
134         }
135 }
136
137 impl UtxoFuture {
138         /// Builds a new future for later resolution.
139         pub fn new() -> Self {
140                 Self { state: Arc::new(Mutex::new(UtxoMessages {
141                         complete: None,
142                         channel_announce: None,
143                         latest_node_announce_a: None,
144                         latest_node_announce_b: None,
145                         latest_channel_update_a: None,
146                         latest_channel_update_b: None,
147                 }))}
148         }
149
150         /// Resolves this future against the given `graph` and with the given `result`.
151         ///
152         /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
153         /// forwarding the validated gossip message onwards to peers.
154         ///
155         /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
156         /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
157         /// after this.
158         ///
159         /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
160         /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
161         pub fn resolve_without_forwarding<L: Deref>(&self,
162                 graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
163         where L::Target: Logger {
164                 self.do_resolve(graph, result);
165         }
166
167         /// Resolves this future against the given `graph` and with the given `result`.
168         ///
169         /// The given `gossip` is used to broadcast any validated messages onwards to all peers which
170         /// have available buffer space.
171         ///
172         /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
173         /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
174         /// after this.
175         ///
176         /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
177         /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
178         pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
179                 graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
180         ) where L::Target: Logger, U::Target: UtxoLookup {
181                 let mut res = self.do_resolve(graph, result);
182                 for msg_opt in res.iter_mut() {
183                         if let Some(msg) = msg_opt.take() {
184                                 gossip.forward_gossip_msg(msg);
185                         }
186                 }
187         }
188
189         fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
190         -> [Option<MessageSendEvent>; 5] where L::Target: Logger {
191                 let (announcement, node_a, node_b, update_a, update_b) = {
192                         let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
193                         let mut async_messages = self.state.lock().unwrap();
194
195                         if async_messages.channel_announce.is_none() {
196                                 // We raced returning to `check_channel_announcement` which hasn't updated
197                                 // `channel_announce` yet. That's okay, we can set the `complete` field which it will
198                                 // check once it gets control again.
199                                 async_messages.complete = Some(result);
200                                 return [None, None, None, None, None];
201                         }
202
203                         let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
204                                 ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
205                                 ChannelAnnouncement::Unsigned(msg) => &msg,
206                         };
207
208                         pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
209
210                         (async_messages.channel_announce.take().unwrap(),
211                                 async_messages.latest_node_announce_a.take(),
212                                 async_messages.latest_node_announce_b.take(),
213                                 async_messages.latest_channel_update_a.take(),
214                                 async_messages.latest_channel_update_b.take())
215                 };
216
217                 let mut res = [None, None, None, None, None];
218                 let mut res_idx = 0;
219
220                 // Now that we've updated our internal state, pass the pending messages back through the
221                 // network graph with a different `UtxoLookup` which will resolve immediately.
222                 // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
223                 // with them.
224                 let resolver = UtxoResolver(result);
225                 match announcement {
226                         ChannelAnnouncement::Full(signed_msg) => {
227                                 if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
228                                         res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
229                                                 msg: signed_msg, update_msg: None,
230                                         });
231                                         res_idx += 1;
232                                 }
233                         },
234                         ChannelAnnouncement::Unsigned(msg) => {
235                                 let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
236                         },
237                 }
238
239                 for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
240                         match announce {
241                                 Some(NodeAnnouncement::Full(signed_msg)) => {
242                                         if graph.update_node_from_announcement(&signed_msg).is_ok() {
243                                                 res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
244                                                         msg: signed_msg,
245                                                 });
246                                                 res_idx += 1;
247                                         }
248                                 },
249                                 Some(NodeAnnouncement::Unsigned(msg)) => {
250                                         let _ = graph.update_node_from_unsigned_announcement(&msg);
251                                 },
252                                 None => {},
253                         }
254                 }
255
256                 for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
257                         match update {
258                                 Some(ChannelUpdate::Full(signed_msg)) => {
259                                         if graph.update_channel(&signed_msg).is_ok() {
260                                                 res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
261                                                         msg: signed_msg,
262                                                 });
263                                                 res_idx += 1;
264                                         }
265                                 },
266                                 Some(ChannelUpdate::Unsigned(msg)) => {
267                                         let _ = graph.update_channel_unsigned(&msg);
268                                 },
269                                 None => {},
270                         }
271                 }
272
273                 res
274         }
275 }
276
277 struct PendingChecksContext {
278         channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
279         nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
280 }
281
282 impl PendingChecksContext {
283         fn lookup_completed(&mut self,
284                 msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
285         ) {
286                 if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
287                         if Weak::ptr_eq(e.get(), &completed_state) {
288                                 e.remove();
289                         }
290                 }
291
292                 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
293                         e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
294                         if e.get().is_empty() { e.remove(); }
295                 }
296                 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
297                         e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
298                         if e.get().is_empty() { e.remove(); }
299                 }
300         }
301 }
302
303 /// A set of messages which are pending UTXO lookups for processing.
304 pub(super) struct PendingChecks {
305         internal: Mutex<PendingChecksContext>,
306 }
307
308 impl PendingChecks {
309         pub(super) fn new() -> Self {
310                 PendingChecks { internal: Mutex::new(PendingChecksContext {
311                         channels: new_hash_map(), nodes: new_hash_map(),
312                 }) }
313         }
314
315         /// Checks if there is a pending `channel_update` UTXO validation for the given channel,
316         /// and, if so, stores the channel message for handling later and returns an `Err`.
317         pub(super) fn check_hold_pending_channel_update(
318                 &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
319         ) -> Result<(), LightningError> {
320                 let mut pending_checks = self.internal.lock().unwrap();
321                 if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
322                         let is_from_a = (msg.flags & 1) == 1;
323                         match Weak::upgrade(e.get()) {
324                                 Some(msgs_ref) => {
325                                         let mut messages = msgs_ref.lock().unwrap();
326                                         let latest_update = if is_from_a {
327                                                         &mut messages.latest_channel_update_a
328                                                 } else {
329                                                         &mut messages.latest_channel_update_b
330                                                 };
331                                         if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
332                                                 // If the messages we got has a higher timestamp, just blindly assume the
333                                                 // signatures on the new message are correct and drop the old message. This
334                                                 // may cause us to end up dropping valid `channel_update`s if a peer is
335                                                 // malicious, but we should get the correct ones when the node updates them.
336                                                 *latest_update = Some(
337                                                         if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
338                                                         else { ChannelUpdate::Unsigned(msg.clone()) });
339                                         }
340                                         return Err(LightningError {
341                                                 err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
342                                                 action: ErrorAction::IgnoreAndLog(Level::Gossip),
343                                         });
344                                 },
345                                 None => { e.remove(); },
346                         }
347                 }
348                 Ok(())
349         }
350
351         /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
352         /// given node and, if so, stores the channel message for handling later and returns an `Err`.
353         pub(super) fn check_hold_pending_node_announcement(
354                 &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
355         ) -> Result<(), LightningError> {
356                 let mut pending_checks = self.internal.lock().unwrap();
357                 if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
358                         let mut found_at_least_one_chan = false;
359                         e.get_mut().retain(|node_msgs| {
360                                 match Weak::upgrade(&node_msgs) {
361                                         Some(chan_mtx) => {
362                                                 let mut chan_msgs = chan_mtx.lock().unwrap();
363                                                 if let Some(chan_announce) = &chan_msgs.channel_announce {
364                                                         let latest_announce =
365                                                                 if *chan_announce.node_id_1() == msg.node_id {
366                                                                         &mut chan_msgs.latest_node_announce_a
367                                                                 } else {
368                                                                         &mut chan_msgs.latest_node_announce_b
369                                                                 };
370                                                         if latest_announce.is_none() ||
371                                                                 latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
372                                                         {
373                                                                 *latest_announce = Some(
374                                                                         if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
375                                                                         else { NodeAnnouncement::Unsigned(msg.clone()) });
376                                                         }
377                                                         found_at_least_one_chan = true;
378                                                         true
379                                                 } else {
380                                                         debug_assert!(false, "channel_announce is set before struct is added to node map");
381                                                         false
382                                                 }
383                                         },
384                                         None => false,
385                                 }
386                         });
387                         if e.get().is_empty() { e.remove(); }
388                         if found_at_least_one_chan {
389                                 return Err(LightningError {
390                                         err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
391                                         action: ErrorAction::IgnoreAndLog(Level::Gossip),
392                                 });
393                         }
394                 }
395                 Ok(())
396         }
397
398         fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
399                 full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
400                 pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
401         ) -> Result<(), msgs::LightningError> {
402                 match pending_channels.entry(msg.short_channel_id) {
403                         hash_map::Entry::Occupied(mut e) => {
404                                 // There's already a pending lookup for the given SCID. Check if the messages
405                                 // are the same and, if so, return immediately (don't bother spawning another
406                                 // lookup if we haven't gotten that far yet).
407                                 match Weak::upgrade(&e.get()) {
408                                         Some(pending_msgs) => {
409                                                 // This may be called with the mutex held on a different UtxoMessages
410                                                 // struct, however in that case we have a global lockorder of new messages
411                                                 // -> old messages, which makes this safe.
412                                                 let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce {
413                                                         Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
414                                                         Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
415                                                         None => {
416                                                                 // This shouldn't actually be reachable. We set the
417                                                                 // `channel_announce` field under the same lock as setting the
418                                                                 // channel map entry. Still, we can just treat it as
419                                                                 // non-matching and let the new request fly.
420                                                                 debug_assert!(false);
421                                                                 false
422                                                         },
423                                                 };
424                                                 if pending_matches {
425                                                         return Err(LightningError {
426                                                                 err: "Channel announcement is already being checked".to_owned(),
427                                                                 action: ErrorAction::IgnoreDuplicateGossip,
428                                                         });
429                                                 } else {
430                                                         // The earlier lookup is a different message. If we have another
431                                                         // request in-flight now replace the original.
432                                                         // Note that in the replace case whether to replace is somewhat
433                                                         // arbitrary - both results will be handled, we're just updating the
434                                                         // value that will be compared to future lookups with the same SCID.
435                                                         if let Some(item) = replacement {
436                                                                 *e.get_mut() = item;
437                                                         }
438                                                 }
439                                         },
440                                         None => {
441                                                 // The earlier lookup already resolved. We can't be sure its the same
442                                                 // so just remove/replace it and move on.
443                                                 if let Some(item) = replacement {
444                                                         *e.get_mut() = item;
445                                                 } else { e.remove(); }
446                                         },
447                                 }
448                         },
449                         hash_map::Entry::Vacant(v) => {
450                                 if let Some(item) = replacement { v.insert(item); }
451                         },
452                 }
453                 Ok(())
454         }
455
456         pub(super) fn check_channel_announcement<U: Deref>(&self,
457                 utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
458                 full_msg: Option<&msgs::ChannelAnnouncement>
459         ) -> Result<Option<Amount>, msgs::LightningError> where U::Target: UtxoLookup {
460                 let handle_result = |res| {
461                         match res {
462                                 Ok(TxOut { value, script_pubkey }) => {
463                                         let expected_script =
464                                                 make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_array(), msg.bitcoin_key_2.as_array()).to_p2wsh();
465                                         if script_pubkey != expected_script {
466                                                 return Err(LightningError{
467                                                         err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
468                                                                 expected_script.to_hex_string(), script_pubkey.to_hex_string()),
469                                                         action: ErrorAction::IgnoreError
470                                                 });
471                                         }
472                                         Ok(Some(value))
473                                 },
474                                 Err(UtxoLookupError::UnknownChain) => {
475                                         Err(LightningError {
476                                                 err: format!("Channel announced on an unknown chain ({})",
477                                                         msg.chain_hash.to_bytes().as_hex()),
478                                                 action: ErrorAction::IgnoreError
479                                         })
480                                 },
481                                 Err(UtxoLookupError::UnknownTx) => {
482                                         Err(LightningError {
483                                                 err: "Channel announced without corresponding UTXO entry".to_owned(),
484                                                 action: ErrorAction::IgnoreError
485                                         })
486                                 },
487                         }
488                 };
489
490                 Self::check_replace_previous_entry(msg, full_msg, None,
491                         &mut self.internal.lock().unwrap().channels)?;
492
493                 match utxo_lookup {
494                         &None => {
495                                 // Tentatively accept, potentially exposing us to DoS attacks
496                                 Ok(None)
497                         },
498                         &Some(ref utxo_lookup) => {
499                                 match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
500                                         UtxoResult::Sync(res) => handle_result(res),
501                                         UtxoResult::Async(future) => {
502                                                 let mut pending_checks = self.internal.lock().unwrap();
503                                                 let mut async_messages = future.state.lock().unwrap();
504                                                 if let Some(res) = async_messages.complete.take() {
505                                                         // In the unlikely event the future resolved before we managed to get it,
506                                                         // handle the result in-line.
507                                                         handle_result(res)
508                                                 } else {
509                                                         Self::check_replace_previous_entry(msg, full_msg,
510                                                                 Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
511                                                         async_messages.channel_announce = Some(
512                                                                 if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
513                                                                 else { ChannelAnnouncement::Unsigned(msg.clone()) });
514                                                         pending_checks.nodes.entry(msg.node_id_1)
515                                                                 .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
516                                                         pending_checks.nodes.entry(msg.node_id_2)
517                                                                 .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
518                                                         Err(LightningError {
519                                                                 err: "Channel being checked async".to_owned(),
520                                                                 action: ErrorAction::IgnoreAndLog(Level::Gossip),
521                                                         })
522                                                 }
523                                         },
524                                 }
525                         }
526                 }
527         }
528
529         /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
530         /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
531         /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
532         /// which we'll have to process. With a socket buffer of 4KB and a minimum
533         /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
534         /// count` messages to process beyond this limit. Because we'll probably have a few peers,
535         /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
536         /// checks should be more than enough for decent parallelism.
537         const MAX_PENDING_LOOKUPS: usize = 32;
538
539         /// Returns true if there are a large number of async checks pending and future
540         /// `channel_announcement` messages should be delayed. Note that this is only a hint and
541         /// messages already in-flight may still have to be handled for various reasons.
542         pub(super) fn too_many_checks_pending(&self) -> bool {
543                 let mut pending_checks = self.internal.lock().unwrap();
544                 if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
545                         // If we have many channel checks pending, ensure we don't have any dangling checks
546                         // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture`
547                         // instead) before we commit to applying backpressure.
548                         pending_checks.channels.retain(|_, chan| {
549                                 Weak::upgrade(&chan).is_some()
550                         });
551                         pending_checks.nodes.retain(|_, channels| {
552                                 channels.retain(|chan| Weak::upgrade(&chan).is_some());
553                                 !channels.is_empty()
554                         });
555                         pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
556                 } else {
557                         false
558                 }
559         }
560 }
561
562 #[cfg(test)]
563 mod tests {
564         use super::*;
565         use crate::routing::gossip::tests::*;
566         use crate::util::test_utils::{TestChainSource, TestLogger};
567
568         use bitcoin::amount::Amount;
569         use bitcoin::secp256k1::{Secp256k1, SecretKey};
570
571         use core::sync::atomic::Ordering;
572
573         fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
574                 let logger = Box::new(TestLogger::new());
575                 let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
576                 let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger);
577
578                 (chain_source, network_graph)
579         }
580
581         fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
582                 NetworkGraph<Box<TestLogger>>, bitcoin::ScriptBuf, msgs::NodeAnnouncement,
583                 msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
584         {
585                 let secp_ctx = Secp256k1::new();
586
587                 let (chain_source, network_graph) = get_network();
588
589                 let good_script = get_channel_script(&secp_ctx);
590                 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
591                 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
592                 let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
593
594                 let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
595                 let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
596
597                 // Note that we have to set the "direction" flag correctly on both messages
598                 let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx);
599                 let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx);
600                 let chan_update_c = get_signed_channel_update(|msg| {
601                         msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
602
603                 (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
604                         node_b_announce, chan_update_a, chan_update_b, chan_update_c)
605         }
606
607         #[test]
608         fn test_fast_async_lookup() {
609                 // Check that async lookups which resolve quicker than the future is returned to the
610                 // `get_utxo` call can read it still resolve properly.
611                 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
612
613                 let future = UtxoFuture::new();
614                 future.resolve_without_forwarding(&network_graph,
615                         Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
616                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
617
618                 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
619                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
620         }
621
622         #[test]
623         fn test_async_lookup() {
624                 // Test a simple async lookup
625                 let (valid_announcement, chain_source, network_graph, good_script,
626                         node_a_announce, node_b_announce, ..) = get_test_objects();
627
628                 let future = UtxoFuture::new();
629                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
630
631                 assert_eq!(
632                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
633                         "Channel being checked async");
634                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
635
636                 future.resolve_without_forwarding(&network_graph,
637                         Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script }));
638                 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
639                 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
640
641                 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
642                         .unwrap().announcement_info.is_none());
643
644                 network_graph.update_node_from_announcement(&node_a_announce).unwrap();
645                 network_graph.update_node_from_announcement(&node_b_announce).unwrap();
646
647                 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
648                         .unwrap().announcement_info.is_some());
649         }
650
651         #[test]
652         fn test_invalid_async_lookup() {
653                 // Test an async lookup which returns an incorrect script
654                 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
655
656                 let future = UtxoFuture::new();
657                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
658
659                 assert_eq!(
660                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
661                         "Channel being checked async");
662                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
663
664                 future.resolve_without_forwarding(&network_graph,
665                         Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: bitcoin::ScriptBuf::new() }));
666                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
667         }
668
669         #[test]
670         fn test_failing_async_lookup() {
671                 // Test an async lookup which returns an error
672                 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
673
674                 let future = UtxoFuture::new();
675                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
676
677                 assert_eq!(
678                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
679                         "Channel being checked async");
680                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
681
682                 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
683                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
684         }
685
686         #[test]
687         fn test_updates_async_lookup() {
688                 // Test async lookups will process pending channel_update/node_announcements once they
689                 // complete.
690                 let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
691                         node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
692
693                 let future = UtxoFuture::new();
694                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
695
696                 assert_eq!(
697                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
698                         "Channel being checked async");
699                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
700
701                 assert_eq!(
702                         network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
703                         "Awaiting channel_announcement validation to accept node_announcement");
704                 assert_eq!(
705                         network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
706                         "Awaiting channel_announcement validation to accept node_announcement");
707
708                 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
709                         "Awaiting channel_announcement validation to accept channel_update");
710                 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
711                         "Awaiting channel_announcement validation to accept channel_update");
712
713                 future.resolve_without_forwarding(&network_graph,
714                         Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
715
716                 assert!(network_graph.read_only().channels()
717                         .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
718                 assert!(network_graph.read_only().channels()
719                         .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
720
721                 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
722                         .unwrap().announcement_info.is_some());
723                 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
724                         .unwrap().announcement_info.is_some());
725         }
726
727         #[test]
728         fn test_latest_update_async_lookup() {
729                 // Test async lookups will process the latest channel_update if two are received while
730                 // awaiting an async UTXO lookup.
731                 let (valid_announcement, chain_source, network_graph, good_script, _,
732                         _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
733
734                 let future = UtxoFuture::new();
735                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
736
737                 assert_eq!(
738                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
739                         "Channel being checked async");
740                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
741
742                 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
743                         "Awaiting channel_announcement validation to accept channel_update");
744                 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
745                         "Awaiting channel_announcement validation to accept channel_update");
746                 assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
747                         "Awaiting channel_announcement validation to accept channel_update");
748
749                 future.resolve_without_forwarding(&network_graph,
750                         Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
751
752                 assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
753                 let graph_lock = network_graph.read_only();
754                 assert!(graph_lock.channels()
755                                 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
756                                 .one_to_two.as_ref().unwrap().last_update !=
757                         graph_lock.channels()
758                                 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
759                                 .two_to_one.as_ref().unwrap().last_update);
760         }
761
762         #[test]
763         fn test_no_double_lookups() {
764                 // Test that a pending async lookup will prevent a second async lookup from flying, but
765                 // only if the channel_announcement message is identical.
766                 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
767
768                 let future = UtxoFuture::new();
769                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
770
771                 assert_eq!(
772                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
773                         "Channel being checked async");
774                 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
775
776                 // If we make a second request with the same message, the call count doesn't increase...
777                 let future_b = UtxoFuture::new();
778                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
779                 assert_eq!(
780                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
781                         "Channel announcement is already being checked");
782                 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
783
784                 // But if we make a third request with a tweaked message, we should get a second call
785                 // against our new future...
786                 let secp_ctx = Secp256k1::new();
787                 let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
788                 let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
789                 let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
790                 assert_eq!(
791                         network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
792                         "Channel being checked async");
793                 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
794
795                 // Still, if we resolve the original future, the original channel will be accepted.
796                 future.resolve_without_forwarding(&network_graph,
797                         Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
798                 assert!(!network_graph.read_only().channels()
799                         .get(&valid_announcement.contents.short_channel_id).unwrap()
800                         .announcement_message.as_ref().unwrap()
801                         .contents.features.supports_unknown_test_feature());
802         }
803
804         #[test]
805         fn test_checks_backpressure() {
806                 // Test that too_many_checks_pending returns true when there are many checks pending, and
807                 // returns false once they complete.
808                 let secp_ctx = Secp256k1::new();
809                 let (chain_source, network_graph) = get_network();
810
811                 // We cheat and use a single future for all the lookups to complete them all at once.
812                 let future = UtxoFuture::new();
813                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
814
815                 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
816                 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
817
818                 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
819                         let valid_announcement = get_signed_channel_announcement(
820                                 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
821                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
822                         assert!(!network_graph.pending_checks.too_many_checks_pending());
823                 }
824
825                 let valid_announcement = get_signed_channel_announcement(
826                         |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
827                 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
828                 assert!(network_graph.pending_checks.too_many_checks_pending());
829
830                 // Once the future completes the "too many checks" flag should reset.
831                 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
832                 assert!(!network_graph.pending_checks.too_many_checks_pending());
833         }
834
835         #[test]
836         fn test_checks_backpressure_drop() {
837                 // Test that too_many_checks_pending returns true when there are many checks pending, and
838                 // returns false if we drop some of the futures without completion.
839                 let secp_ctx = Secp256k1::new();
840                 let (chain_source, network_graph) = get_network();
841
842                 // We cheat and use a single future for all the lookups to complete them all at once.
843                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
844
845                 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
846                 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
847
848                 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
849                         let valid_announcement = get_signed_channel_announcement(
850                                 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
851                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
852                         assert!(!network_graph.pending_checks.too_many_checks_pending());
853                 }
854
855                 let valid_announcement = get_signed_channel_announcement(
856                         |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
857                 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
858                 assert!(network_graph.pending_checks.too_many_checks_pending());
859
860                 // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
861                 // should reset to false.
862                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
863                 assert!(!network_graph.pending_checks.too_many_checks_pending());
864         }
865 }