Remove genesis block hash from public API
[rust-lightning] / lightning / src / routing / utxo.rs
1 // This file is Copyright its original authors, visible in version control
2 // history.
3 //
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
8 // licenses.
9
10 //! This module contains traits for LDK to access UTXOs to check gossip data is correct.
11 //!
12 //! When lightning nodes gossip channel information, they resist DoS attacks by checking that each
13 //! channel matches a UTXO on-chain, requiring at least some marginal on-chain transacting in
14 //! order to announce a channel. This module handles that checking.
15
16 use bitcoin::{BlockHash, TxOut};
17 use bitcoin::hashes::hex::ToHex;
18
19 use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
20 use crate::ln::msgs::{self, LightningError, ErrorAction};
21 use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
22 use crate::util::events::MessageSendEvent;
23 use crate::util::logger::{Level, Logger};
24 use crate::util::ser::Writeable;
25
26 use crate::prelude::*;
27
28 use alloc::sync::{Arc, Weak};
29 use crate::sync::Mutex;
30 use core::ops::Deref;
31
32 /// An error when accessing the chain via [`UtxoLookup`].
33 #[derive(Clone, Debug)]
34 pub enum UtxoLookupError {
35         /// The requested chain is unknown.
36         UnknownChain,
37
38         /// The requested transaction doesn't exist or hasn't confirmed.
39         UnknownTx,
40 }
41
42 /// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
43 /// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
44 /// variant.
45 #[derive(Clone)]
46 pub enum UtxoResult {
47         /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
48         /// requested or a [`UtxoLookupError`].
49         Sync(Result<TxOut, UtxoLookupError>),
50         /// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of
51         /// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes.
52         ///
53         /// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
54         /// but only fairly loosely. Because a pending checks block all message processing, leaving
55         /// checks pending for an extended time may cause DoS of other functions. It is recommended you
56         /// keep a tight timeout on lookups, on the order of a few seconds.
57         Async(UtxoFuture),
58 }
59
60 /// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs.
61 pub trait UtxoLookup {
62         /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
63         /// Returns an error if `genesis_hash` is for a different chain or if such a transaction output
64         /// is unknown.
65         ///
66         /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
67         fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult;
68 }
69
70 enum ChannelAnnouncement {
71         Full(msgs::ChannelAnnouncement),
72         Unsigned(msgs::UnsignedChannelAnnouncement),
73 }
74 impl ChannelAnnouncement {
75         fn node_id_1(&self) -> &NodeId {
76                 match self {
77                         ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
78                         ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
79                 }
80         }
81 }
82
83 enum NodeAnnouncement {
84         Full(msgs::NodeAnnouncement),
85         Unsigned(msgs::UnsignedNodeAnnouncement),
86 }
87 impl NodeAnnouncement {
88         fn timestamp(&self) -> u32 {
89                 match self {
90                         NodeAnnouncement::Full(msg) => msg.contents.timestamp,
91                         NodeAnnouncement::Unsigned(msg) => msg.timestamp,
92                 }
93         }
94 }
95
96 enum ChannelUpdate {
97         Full(msgs::ChannelUpdate),
98         Unsigned(msgs::UnsignedChannelUpdate),
99 }
100 impl ChannelUpdate {
101         fn timestamp(&self) -> u32 {
102                 match self {
103                         ChannelUpdate::Full(msg) => msg.contents.timestamp,
104                         ChannelUpdate::Unsigned(msg) => msg.timestamp,
105                 }
106         }
107 }
108
109 struct UtxoMessages {
110         complete: Option<Result<TxOut, UtxoLookupError>>,
111         channel_announce: Option<ChannelAnnouncement>,
112         latest_node_announce_a: Option<NodeAnnouncement>,
113         latest_node_announce_b: Option<NodeAnnouncement>,
114         latest_channel_update_a: Option<ChannelUpdate>,
115         latest_channel_update_b: Option<ChannelUpdate>,
116 }
117
118 /// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
119 ///
120 /// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info.
121 #[derive(Clone)]
122 pub struct UtxoFuture {
123         state: Arc<Mutex<UtxoMessages>>,
124 }
125
126 /// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph
127 /// once we have a concrete resolution of a request.
128 struct UtxoResolver(Result<TxOut, UtxoLookupError>);
129 impl UtxoLookup for UtxoResolver {
130         fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
131                 UtxoResult::Sync(self.0.clone())
132         }
133 }
134
135 impl UtxoFuture {
136         /// Builds a new future for later resolution.
137         pub fn new() -> Self {
138                 Self { state: Arc::new(Mutex::new(UtxoMessages {
139                         complete: None,
140                         channel_announce: None,
141                         latest_node_announce_a: None,
142                         latest_node_announce_b: None,
143                         latest_channel_update_a: None,
144                         latest_channel_update_b: None,
145                 }))}
146         }
147
148         /// Resolves this future against the given `graph` and with the given `result`.
149         ///
150         /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
151         /// forwarding the validated gossip message onwards to peers.
152         ///
153         /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
154         /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
155         /// after this.
156         ///
157         /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
158         /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
159         pub fn resolve_without_forwarding<L: Deref>(&self,
160                 graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
161         where L::Target: Logger {
162                 self.do_resolve(graph, result);
163         }
164
165         /// Resolves this future against the given `graph` and with the given `result`.
166         ///
167         /// The given `gossip` is used to broadcast any validated messages onwards to all peers which
168         /// have available buffer space.
169         ///
170         /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
171         /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
172         /// after this.
173         ///
174         /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
175         /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
176         pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
177                 graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
178         ) where L::Target: Logger, U::Target: UtxoLookup {
179                 let mut res = self.do_resolve(graph, result);
180                 for msg_opt in res.iter_mut() {
181                         if let Some(msg) = msg_opt.take() {
182                                 gossip.forward_gossip_msg(msg);
183                         }
184                 }
185         }
186
187         fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
188         -> [Option<MessageSendEvent>; 5] where L::Target: Logger {
189                 let (announcement, node_a, node_b, update_a, update_b) = {
190                         let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
191                         let mut async_messages = self.state.lock().unwrap();
192
193                         if async_messages.channel_announce.is_none() {
194                                 // We raced returning to `check_channel_announcement` which hasn't updated
195                                 // `channel_announce` yet. That's okay, we can set the `complete` field which it will
196                                 // check once it gets control again.
197                                 async_messages.complete = Some(result);
198                                 return [None, None, None, None, None];
199                         }
200
201                         let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
202                                 ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
203                                 ChannelAnnouncement::Unsigned(msg) => &msg,
204                         };
205
206                         pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
207
208                         (async_messages.channel_announce.take().unwrap(),
209                                 async_messages.latest_node_announce_a.take(),
210                                 async_messages.latest_node_announce_b.take(),
211                                 async_messages.latest_channel_update_a.take(),
212                                 async_messages.latest_channel_update_b.take())
213                 };
214
215                 let mut res = [None, None, None, None, None];
216                 let mut res_idx = 0;
217
218                 // Now that we've updated our internal state, pass the pending messages back through the
219                 // network graph with a different `UtxoLookup` which will resolve immediately.
220                 // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
221                 // with them.
222                 let resolver = UtxoResolver(result);
223                 match announcement {
224                         ChannelAnnouncement::Full(signed_msg) => {
225                                 if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
226                                         res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
227                                                 msg: signed_msg, update_msg: None,
228                                         });
229                                         res_idx += 1;
230                                 }
231                         },
232                         ChannelAnnouncement::Unsigned(msg) => {
233                                 let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
234                         },
235                 }
236
237                 for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
238                         match announce {
239                                 Some(NodeAnnouncement::Full(signed_msg)) => {
240                                         if graph.update_node_from_announcement(&signed_msg).is_ok() {
241                                                 res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
242                                                         msg: signed_msg,
243                                                 });
244                                                 res_idx += 1;
245                                         }
246                                 },
247                                 Some(NodeAnnouncement::Unsigned(msg)) => {
248                                         let _ = graph.update_node_from_unsigned_announcement(&msg);
249                                 },
250                                 None => {},
251                         }
252                 }
253
254                 for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
255                         match update {
256                                 Some(ChannelUpdate::Full(signed_msg)) => {
257                                         if graph.update_channel(&signed_msg).is_ok() {
258                                                 res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
259                                                         msg: signed_msg,
260                                                 });
261                                                 res_idx += 1;
262                                         }
263                                 },
264                                 Some(ChannelUpdate::Unsigned(msg)) => {
265                                         let _ = graph.update_channel_unsigned(&msg);
266                                 },
267                                 None => {},
268                         }
269                 }
270
271                 res
272         }
273 }
274
275 struct PendingChecksContext {
276         channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
277         nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
278 }
279
280 impl PendingChecksContext {
281         fn lookup_completed(&mut self,
282                 msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
283         ) {
284                 if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
285                         if Weak::ptr_eq(e.get(), &completed_state) {
286                                 e.remove();
287                         }
288                 }
289
290                 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
291                         e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
292                         if e.get().is_empty() { e.remove(); }
293                 }
294                 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
295                         e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
296                         if e.get().is_empty() { e.remove(); }
297                 }
298         }
299 }
300
301 /// A set of messages which are pending UTXO lookups for processing.
302 pub(super) struct PendingChecks {
303         internal: Mutex<PendingChecksContext>,
304 }
305
306 impl PendingChecks {
307         pub(super) fn new() -> Self {
308                 PendingChecks { internal: Mutex::new(PendingChecksContext {
309                         channels: HashMap::new(), nodes: HashMap::new(),
310                 }) }
311         }
312
313         /// Checks if there is a pending `channel_update` UTXO validation for the given channel,
314         /// and, if so, stores the channel message for handling later and returns an `Err`.
315         pub(super) fn check_hold_pending_channel_update(
316                 &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
317         ) -> Result<(), LightningError> {
318                 let mut pending_checks = self.internal.lock().unwrap();
319                 if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
320                         let is_from_a = (msg.flags & 1) == 1;
321                         match Weak::upgrade(e.get()) {
322                                 Some(msgs_ref) => {
323                                         let mut messages = msgs_ref.lock().unwrap();
324                                         let latest_update = if is_from_a {
325                                                         &mut messages.latest_channel_update_a
326                                                 } else {
327                                                         &mut messages.latest_channel_update_b
328                                                 };
329                                         if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
330                                                 // If the messages we got has a higher timestamp, just blindly assume the
331                                                 // signatures on the new message are correct and drop the old message. This
332                                                 // may cause us to end up dropping valid `channel_update`s if a peer is
333                                                 // malicious, but we should get the correct ones when the node updates them.
334                                                 *latest_update = Some(
335                                                         if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
336                                                         else { ChannelUpdate::Unsigned(msg.clone()) });
337                                         }
338                                         return Err(LightningError {
339                                                 err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
340                                                 action: ErrorAction::IgnoreAndLog(Level::Gossip),
341                                         });
342                                 },
343                                 None => { e.remove(); },
344                         }
345                 }
346                 Ok(())
347         }
348
349         /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
350         /// given node and, if so, stores the channel message for handling later and returns an `Err`.
351         pub(super) fn check_hold_pending_node_announcement(
352                 &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
353         ) -> Result<(), LightningError> {
354                 let mut pending_checks = self.internal.lock().unwrap();
355                 if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
356                         let mut found_at_least_one_chan = false;
357                         e.get_mut().retain(|node_msgs| {
358                                 match Weak::upgrade(&node_msgs) {
359                                         Some(chan_mtx) => {
360                                                 let mut chan_msgs = chan_mtx.lock().unwrap();
361                                                 if let Some(chan_announce) = &chan_msgs.channel_announce {
362                                                         let latest_announce =
363                                                                 if *chan_announce.node_id_1() == msg.node_id {
364                                                                         &mut chan_msgs.latest_node_announce_a
365                                                                 } else {
366                                                                         &mut chan_msgs.latest_node_announce_b
367                                                                 };
368                                                         if latest_announce.is_none() ||
369                                                                 latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
370                                                         {
371                                                                 *latest_announce = Some(
372                                                                         if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
373                                                                         else { NodeAnnouncement::Unsigned(msg.clone()) });
374                                                         }
375                                                         found_at_least_one_chan = true;
376                                                         true
377                                                 } else {
378                                                         debug_assert!(false, "channel_announce is set before struct is added to node map");
379                                                         false
380                                                 }
381                                         },
382                                         None => false,
383                                 }
384                         });
385                         if e.get().is_empty() { e.remove(); }
386                         if found_at_least_one_chan {
387                                 return Err(LightningError {
388                                         err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
389                                         action: ErrorAction::IgnoreAndLog(Level::Gossip),
390                                 });
391                         }
392                 }
393                 Ok(())
394         }
395
396         fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
397                 full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
398                 pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
399         ) -> Result<(), msgs::LightningError> {
400                 match pending_channels.entry(msg.short_channel_id) {
401                         hash_map::Entry::Occupied(mut e) => {
402                                 // There's already a pending lookup for the given SCID. Check if the messages
403                                 // are the same and, if so, return immediately (don't bother spawning another
404                                 // lookup if we haven't gotten that far yet).
405                                 match Weak::upgrade(&e.get()) {
406                                         Some(pending_msgs) => {
407                                                 let pending_matches = match &pending_msgs.lock().unwrap().channel_announce {
408                                                         Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
409                                                         Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
410                                                         None => {
411                                                                 // This shouldn't actually be reachable. We set the
412                                                                 // `channel_announce` field under the same lock as setting the
413                                                                 // channel map entry. Still, we can just treat it as
414                                                                 // non-matching and let the new request fly.
415                                                                 debug_assert!(false);
416                                                                 false
417                                                         },
418                                                 };
419                                                 if pending_matches {
420                                                         return Err(LightningError {
421                                                                 err: "Channel announcement is already being checked".to_owned(),
422                                                                 action: ErrorAction::IgnoreDuplicateGossip,
423                                                         });
424                                                 } else {
425                                                         // The earlier lookup is a different message. If we have another
426                                                         // request in-flight now replace the original.
427                                                         // Note that in the replace case whether to replace is somewhat
428                                                         // arbitrary - both results will be handled, we're just updating the
429                                                         // value that will be compared to future lookups with the same SCID.
430                                                         if let Some(item) = replacement {
431                                                                 *e.get_mut() = item;
432                                                         }
433                                                 }
434                                         },
435                                         None => {
436                                                 // The earlier lookup already resolved. We can't be sure its the same
437                                                 // so just remove/replace it and move on.
438                                                 if let Some(item) = replacement {
439                                                         *e.get_mut() = item;
440                                                 } else { e.remove(); }
441                                         },
442                                 }
443                         },
444                         hash_map::Entry::Vacant(v) => {
445                                 if let Some(item) = replacement { v.insert(item); }
446                         },
447                 }
448                 Ok(())
449         }
450
451         pub(super) fn check_channel_announcement<U: Deref>(&self,
452                 utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
453                 full_msg: Option<&msgs::ChannelAnnouncement>
454         ) -> Result<Option<u64>, msgs::LightningError> where U::Target: UtxoLookup {
455                 let handle_result = |res| {
456                         match res {
457                                 Ok(TxOut { value, script_pubkey }) => {
458                                         let expected_script =
459                                                 make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh();
460                                         if script_pubkey != expected_script {
461                                                 return Err(LightningError{
462                                                         err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
463                                                                 expected_script.to_hex(), script_pubkey.to_hex()),
464                                                         action: ErrorAction::IgnoreError
465                                                 });
466                                         }
467                                         Ok(Some(value))
468                                 },
469                                 Err(UtxoLookupError::UnknownChain) => {
470                                         Err(LightningError {
471                                                 err: format!("Channel announced on an unknown chain ({})",
472                                                         msg.chain_hash.encode().to_hex()),
473                                                 action: ErrorAction::IgnoreError
474                                         })
475                                 },
476                                 Err(UtxoLookupError::UnknownTx) => {
477                                         Err(LightningError {
478                                                 err: "Channel announced without corresponding UTXO entry".to_owned(),
479                                                 action: ErrorAction::IgnoreError
480                                         })
481                                 },
482                         }
483                 };
484
485                 Self::check_replace_previous_entry(msg, full_msg, None,
486                         &mut self.internal.lock().unwrap().channels)?;
487
488                 match utxo_lookup {
489                         &None => {
490                                 // Tentatively accept, potentially exposing us to DoS attacks
491                                 Ok(None)
492                         },
493                         &Some(ref utxo_lookup) => {
494                                 match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
495                                         UtxoResult::Sync(res) => handle_result(res),
496                                         UtxoResult::Async(future) => {
497                                                 let mut pending_checks = self.internal.lock().unwrap();
498                                                 let mut async_messages = future.state.lock().unwrap();
499                                                 if let Some(res) = async_messages.complete.take() {
500                                                         // In the unlikely event the future resolved before we managed to get it,
501                                                         // handle the result in-line.
502                                                         handle_result(res)
503                                                 } else {
504                                                         Self::check_replace_previous_entry(msg, full_msg,
505                                                                 Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
506                                                         async_messages.channel_announce = Some(
507                                                                 if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
508                                                                 else { ChannelAnnouncement::Unsigned(msg.clone()) });
509                                                         pending_checks.nodes.entry(msg.node_id_1)
510                                                                 .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
511                                                         pending_checks.nodes.entry(msg.node_id_2)
512                                                                 .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
513                                                         Err(LightningError {
514                                                                 err: "Channel being checked async".to_owned(),
515                                                                 action: ErrorAction::IgnoreAndLog(Level::Gossip),
516                                                         })
517                                                 }
518                                         },
519                                 }
520                         }
521                 }
522         }
523
524         /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
525         /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
526         /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
527         /// which we'll have to process. With a socket buffer of 4KB and a minimum
528         /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
529         /// count` messages to process beyond this limit. Because we'll probably have a few peers,
530         /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
531         /// checks should be more than enough for decent parallelism.
532         const MAX_PENDING_LOOKUPS: usize = 32;
533
534         /// Returns true if there are a large number of async checks pending and future
535         /// `channel_announcement` messages should be delayed. Note that this is only a hint and
536         /// messages already in-flight may still have to be handled for various reasons.
537         pub(super) fn too_many_checks_pending(&self) -> bool {
538                 let mut pending_checks = self.internal.lock().unwrap();
539                 if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
540                         // If we have many channel checks pending, ensure we don't have any dangling checks
541                         // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture`
542                         // instead) before we commit to applying backpressure.
543                         pending_checks.channels.retain(|_, chan| {
544                                 Weak::upgrade(&chan).is_some()
545                         });
546                         pending_checks.nodes.retain(|_, channels| {
547                                 channels.retain(|chan| Weak::upgrade(&chan).is_some());
548                                 !channels.is_empty()
549                         });
550                         pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
551                 } else {
552                         false
553                 }
554         }
555 }
556
557 #[cfg(test)]
558 mod tests {
559         use super::*;
560         use crate::routing::gossip::tests::*;
561         use crate::util::test_utils::{TestChainSource, TestLogger};
562         use crate::ln::msgs;
563
564         use bitcoin::secp256k1::{Secp256k1, SecretKey};
565
566         use core::sync::atomic::Ordering;
567
568         fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
569                 let logger = Box::new(TestLogger::new());
570                 let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
571                 let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger);
572
573                 (chain_source, network_graph)
574         }
575
576         fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
577                 NetworkGraph<Box<TestLogger>>, bitcoin::Script, msgs::NodeAnnouncement,
578                 msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
579         {
580                 let secp_ctx = Secp256k1::new();
581
582                 let (chain_source, network_graph) = get_network();
583
584                 let good_script = get_channel_script(&secp_ctx);
585                 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
586                 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
587                 let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
588
589                 let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
590                 let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
591
592                 // Note that we have to set the "direction" flag correctly on both messages
593                 let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx);
594                 let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx);
595                 let chan_update_c = get_signed_channel_update(|msg| {
596                         msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
597
598                 (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
599                         node_b_announce, chan_update_a, chan_update_b, chan_update_c)
600         }
601
602         #[test]
603         fn test_fast_async_lookup() {
604                 // Check that async lookups which resolve quicker than the future is returned to the
605                 // `get_utxo` call can read it still resolve properly.
606                 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
607
608                 let future = UtxoFuture::new();
609                 future.resolve_without_forwarding(&network_graph,
610                         Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
611                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
612
613                 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
614                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
615         }
616
617         #[test]
618         fn test_async_lookup() {
619                 // Test a simple async lookup
620                 let (valid_announcement, chain_source, network_graph, good_script,
621                         node_a_announce, node_b_announce, ..) = get_test_objects();
622
623                 let future = UtxoFuture::new();
624                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
625
626                 assert_eq!(
627                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
628                         "Channel being checked async");
629                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
630
631                 future.resolve_without_forwarding(&network_graph,
632                         Ok(TxOut { value: 0, script_pubkey: good_script }));
633                 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
634                 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
635
636                 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
637                         .unwrap().announcement_info.is_none());
638
639                 network_graph.update_node_from_announcement(&node_a_announce).unwrap();
640                 network_graph.update_node_from_announcement(&node_b_announce).unwrap();
641
642                 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
643                         .unwrap().announcement_info.is_some());
644         }
645
646         #[test]
647         fn test_invalid_async_lookup() {
648                 // Test an async lookup which returns an incorrect script
649                 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
650
651                 let future = UtxoFuture::new();
652                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
653
654                 assert_eq!(
655                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
656                         "Channel being checked async");
657                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
658
659                 future.resolve_without_forwarding(&network_graph,
660                         Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::Script::new() }));
661                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
662         }
663
664         #[test]
665         fn test_failing_async_lookup() {
666                 // Test an async lookup which returns an error
667                 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
668
669                 let future = UtxoFuture::new();
670                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
671
672                 assert_eq!(
673                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
674                         "Channel being checked async");
675                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
676
677                 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
678                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
679         }
680
681         #[test]
682         fn test_updates_async_lookup() {
683                 // Test async lookups will process pending channel_update/node_announcements once they
684                 // complete.
685                 let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
686                         node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
687
688                 let future = UtxoFuture::new();
689                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
690
691                 assert_eq!(
692                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
693                         "Channel being checked async");
694                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
695
696                 assert_eq!(
697                         network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
698                         "Awaiting channel_announcement validation to accept node_announcement");
699                 assert_eq!(
700                         network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
701                         "Awaiting channel_announcement validation to accept node_announcement");
702
703                 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
704                         "Awaiting channel_announcement validation to accept channel_update");
705                 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
706                         "Awaiting channel_announcement validation to accept channel_update");
707
708                 future.resolve_without_forwarding(&network_graph,
709                         Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
710
711                 assert!(network_graph.read_only().channels()
712                         .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
713                 assert!(network_graph.read_only().channels()
714                         .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
715
716                 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
717                         .unwrap().announcement_info.is_some());
718                 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
719                         .unwrap().announcement_info.is_some());
720         }
721
722         #[test]
723         fn test_latest_update_async_lookup() {
724                 // Test async lookups will process the latest channel_update if two are received while
725                 // awaiting an async UTXO lookup.
726                 let (valid_announcement, chain_source, network_graph, good_script, _,
727                         _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
728
729                 let future = UtxoFuture::new();
730                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
731
732                 assert_eq!(
733                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
734                         "Channel being checked async");
735                 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
736
737                 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
738                         "Awaiting channel_announcement validation to accept channel_update");
739                 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
740                         "Awaiting channel_announcement validation to accept channel_update");
741                 assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
742                         "Awaiting channel_announcement validation to accept channel_update");
743
744                 future.resolve_without_forwarding(&network_graph,
745                         Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
746
747                 assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
748                 assert!(network_graph.read_only().channels()
749                                 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
750                                 .one_to_two.as_ref().unwrap().last_update !=
751                         network_graph.read_only().channels()
752                                 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
753                                 .two_to_one.as_ref().unwrap().last_update);
754         }
755
756         #[test]
757         fn test_no_double_lookups() {
758                 // Test that a pending async lookup will prevent a second async lookup from flying, but
759                 // only if the channel_announcement message is identical.
760                 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
761
762                 let future = UtxoFuture::new();
763                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
764
765                 assert_eq!(
766                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
767                         "Channel being checked async");
768                 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
769
770                 // If we make a second request with the same message, the call count doesn't increase...
771                 let future_b = UtxoFuture::new();
772                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
773                 assert_eq!(
774                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
775                         "Channel announcement is already being checked");
776                 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
777
778                 // But if we make a third request with a tweaked message, we should get a second call
779                 // against our new future...
780                 let secp_ctx = Secp256k1::new();
781                 let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
782                 let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
783                 let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
784                 assert_eq!(
785                         network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
786                         "Channel being checked async");
787                 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
788
789                 // Still, if we resolve the original future, the original channel will be accepted.
790                 future.resolve_without_forwarding(&network_graph,
791                         Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
792                 assert!(!network_graph.read_only().channels()
793                         .get(&valid_announcement.contents.short_channel_id).unwrap()
794                         .announcement_message.as_ref().unwrap()
795                         .contents.features.supports_unknown_test_feature());
796         }
797
798         #[test]
799         fn test_checks_backpressure() {
800                 // Test that too_many_checks_pending returns true when there are many checks pending, and
801                 // returns false once they complete.
802                 let secp_ctx = Secp256k1::new();
803                 let (chain_source, network_graph) = get_network();
804
805                 // We cheat and use a single future for all the lookups to complete them all at once.
806                 let future = UtxoFuture::new();
807                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
808
809                 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
810                 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
811
812                 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
813                         let valid_announcement = get_signed_channel_announcement(
814                                 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
815                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
816                         assert!(!network_graph.pending_checks.too_many_checks_pending());
817                 }
818
819                 let valid_announcement = get_signed_channel_announcement(
820                         |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
821                 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
822                 assert!(network_graph.pending_checks.too_many_checks_pending());
823
824                 // Once the future completes the "too many checks" flag should reset.
825                 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
826                 assert!(!network_graph.pending_checks.too_many_checks_pending());
827         }
828
829         #[test]
830         fn test_checks_backpressure_drop() {
831                 // Test that too_many_checks_pending returns true when there are many checks pending, and
832                 // returns false if we drop some of the futures without completion.
833                 let secp_ctx = Secp256k1::new();
834                 let (chain_source, network_graph) = get_network();
835
836                 // We cheat and use a single future for all the lookups to complete them all at once.
837                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
838
839                 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
840                 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
841
842                 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
843                         let valid_announcement = get_signed_channel_announcement(
844                                 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
845                         network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
846                         assert!(!network_graph.pending_checks.too_many_checks_pending());
847                 }
848
849                 let valid_announcement = get_signed_channel_announcement(
850                         |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
851                 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
852                 assert!(network_graph.pending_checks.too_many_checks_pending());
853
854                 // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
855                 // should reset to false.
856                 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
857                 assert!(!network_graph.pending_checks.too_many_checks_pending());
858         }
859 }