Process `channel_update`/`node_announcement` async if needed
[rust-lightning] / lightning / src / routing / utxo.rs
1 // This file is Copyright its original authors, visible in version control
2 // history.
3 //
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
8 // licenses.
9
10 //! This module contains traits for LDK to access UTXOs to check gossip data is correct.
11 //!
12 //! When lightning nodes gossip channel information, they resist DoS attacks by checking that each
13 //! channel matches a UTXO on-chain, requiring at least some marginal on-chain transacting in
14 //! order to announce a channel. This module handles that checking.
15
16 use bitcoin::{BlockHash, TxOut};
17 use bitcoin::hashes::hex::ToHex;
18
19 use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
20 use crate::ln::msgs::{self, LightningError, ErrorAction};
21 use crate::routing::gossip::{NetworkGraph, NodeId};
22 use crate::util::logger::{Level, Logger};
23 use crate::util::ser::Writeable;
24
25 use crate::prelude::*;
26
27 use alloc::sync::{Arc, Weak};
28 use crate::sync::Mutex;
29 use core::ops::Deref;
30
31 /// An error when accessing the chain via [`UtxoLookup`].
32 #[derive(Clone, Debug)]
33 pub enum UtxoLookupError {
34         /// The requested chain is unknown.
35         UnknownChain,
36
37         /// The requested transaction doesn't exist or hasn't confirmed.
38         UnknownTx,
39 }
40
41 /// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
42 /// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
43 /// variant.
44 pub enum UtxoResult {
45         /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
46         /// requested or a [`UtxoLookupError`].
47         Sync(Result<TxOut, UtxoLookupError>),
48         /// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of
49         /// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes.
50         ///
51         /// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
52         /// but only fairly loosely. Because a pending checks block all message processing, leaving
53         /// checks pending for an extended time may cause DoS of other functions. It is recommended you
54         /// keep a tight timeout on lookups, on the order of a few seconds.
55         Async(UtxoFuture),
56 }
57
58 /// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs.
59 pub trait UtxoLookup {
60         /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
61         /// Returns an error if `genesis_hash` is for a different chain or if such a transaction output
62         /// is unknown.
63         ///
64         /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
65         fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult;
66 }
67
68 enum ChannelAnnouncement {
69         Full(msgs::ChannelAnnouncement),
70         Unsigned(msgs::UnsignedChannelAnnouncement),
71 }
72 impl ChannelAnnouncement {
73         fn node_id_1(&self) -> &NodeId {
74                 match self {
75                         ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
76                         ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
77                 }
78         }
79 }
80
81 enum NodeAnnouncement {
82         Full(msgs::NodeAnnouncement),
83         Unsigned(msgs::UnsignedNodeAnnouncement),
84 }
85 impl NodeAnnouncement {
86         fn timestamp(&self) -> u32 {
87                 match self {
88                         NodeAnnouncement::Full(msg) => msg.contents.timestamp,
89                         NodeAnnouncement::Unsigned(msg) => msg.timestamp,
90                 }
91         }
92 }
93
94 enum ChannelUpdate {
95         Full(msgs::ChannelUpdate),
96         Unsigned(msgs::UnsignedChannelUpdate),
97 }
98 impl ChannelUpdate {
99         fn timestamp(&self) -> u32 {
100                 match self {
101                         ChannelUpdate::Full(msg) => msg.contents.timestamp,
102                         ChannelUpdate::Unsigned(msg) => msg.timestamp,
103                 }
104         }
105 }
106
107 struct UtxoMessages {
108         complete: Option<Result<TxOut, UtxoLookupError>>,
109         channel_announce: Option<ChannelAnnouncement>,
110         latest_node_announce_a: Option<NodeAnnouncement>,
111         latest_node_announce_b: Option<NodeAnnouncement>,
112         latest_channel_update_a: Option<ChannelUpdate>,
113         latest_channel_update_b: Option<ChannelUpdate>,
114 }
115
116 /// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
117 ///
118 /// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info.
119 #[derive(Clone)]
120 pub struct UtxoFuture {
121         state: Arc<Mutex<UtxoMessages>>,
122 }
123
124 /// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph
125 /// once we have a concrete resolution of a request.
126 struct UtxoResolver(Result<TxOut, UtxoLookupError>);
127 impl UtxoLookup for UtxoResolver {
128         fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
129                 UtxoResult::Sync(self.0.clone())
130         }
131 }
132
133 impl UtxoFuture {
134         /// Builds a new future for later resolution.
135         pub fn new() -> Self {
136                 Self { state: Arc::new(Mutex::new(UtxoMessages {
137                         complete: None,
138                         channel_announce: None,
139                         latest_node_announce_a: None,
140                         latest_node_announce_b: None,
141                         latest_channel_update_a: None,
142                         latest_channel_update_b: None,
143                 }))}
144         }
145
146         /// Resolves this future against the given `graph` and with the given `result`.
147         pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
148         where L::Target: Logger {
149                 let (announcement, node_a, node_b, update_a, update_b) = {
150                         let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
151                         let mut async_messages = self.state.lock().unwrap();
152
153                         if async_messages.channel_announce.is_none() {
154                                 // We raced returning to `check_channel_announcement` which hasn't updated
155                                 // `channel_announce` yet. That's okay, we can set the `complete` field which it will
156                                 // check once it gets control again.
157                                 async_messages.complete = Some(result);
158                                 return;
159                         }
160
161                         let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
162                                 ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
163                                 ChannelAnnouncement::Unsigned(msg) => &msg,
164                         };
165
166                         pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
167
168                         (async_messages.channel_announce.take().unwrap(),
169                                 async_messages.latest_node_announce_a.take(),
170                                 async_messages.latest_node_announce_b.take(),
171                                 async_messages.latest_channel_update_a.take(),
172                                 async_messages.latest_channel_update_b.take())
173                 };
174
175                 // Now that we've updated our internal state, pass the pending messages back through the
176                 // network graph with a different `UtxoLookup` which will resolve immediately.
177                 // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
178                 // with them.
179                 let resolver = UtxoResolver(result);
180                 match announcement {
181                         ChannelAnnouncement::Full(signed_msg) => {
182                                 let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver));
183                         },
184                         ChannelAnnouncement::Unsigned(msg) => {
185                                 let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
186                         },
187                 }
188
189                 for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
190                         match announce {
191                                 Some(NodeAnnouncement::Full(signed_msg)) => {
192                                         let _ = graph.update_node_from_announcement(&signed_msg);
193                                 },
194                                 Some(NodeAnnouncement::Unsigned(msg)) => {
195                                         let _ = graph.update_node_from_unsigned_announcement(&msg);
196                                 },
197                                 None => {},
198                         }
199                 }
200
201                 for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
202                         match update {
203                                 Some(ChannelUpdate::Full(signed_msg)) => {
204                                         let _ = graph.update_channel(&signed_msg);
205                                 },
206                                 Some(ChannelUpdate::Unsigned(msg)) => {
207                                         let _ = graph.update_channel_unsigned(&msg);
208                                 },
209                                 None => {},
210                         }
211                 }
212         }
213 }
214
215 struct PendingChecksContext {
216         channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
217         nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
218 }
219
220 impl PendingChecksContext {
221         fn lookup_completed(&mut self,
222                 msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
223         ) {
224                 if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
225                         if Weak::ptr_eq(e.get(), &completed_state) {
226                                 e.remove();
227                         }
228                 }
229
230                 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
231                         e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
232                         if e.get().is_empty() { e.remove(); }
233                 }
234                 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
235                         e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
236                         if e.get().is_empty() { e.remove(); }
237                 }
238         }
239 }
240
241 /// A set of messages which are pending UTXO lookups for processing.
242 pub(super) struct PendingChecks {
243         internal: Mutex<PendingChecksContext>,
244 }
245
246 impl PendingChecks {
247         pub(super) fn new() -> Self {
248                 PendingChecks { internal: Mutex::new(PendingChecksContext {
249                         channels: HashMap::new(), nodes: HashMap::new(),
250                 }) }
251         }
252
253         /// Checks if there is a pending `channel_update` UTXO validation for the given channel,
254         /// and, if so, stores the channel message for handling later and returns an `Err`.
255         pub(super) fn check_hold_pending_channel_update(
256                 &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
257         ) -> Result<(), LightningError> {
258                 let mut pending_checks = self.internal.lock().unwrap();
259                 if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
260                         let is_from_a = (msg.flags & 1) == 1;
261                         match Weak::upgrade(e.get()) {
262                                 Some(msgs_ref) => {
263                                         let mut messages = msgs_ref.lock().unwrap();
264                                         let latest_update = if is_from_a {
265                                                         &mut messages.latest_channel_update_a
266                                                 } else {
267                                                         &mut messages.latest_channel_update_b
268                                                 };
269                                         if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
270                                                 // If the messages we got has a higher timestamp, just blindly assume the
271                                                 // signatures on the new message are correct and drop the old message. This
272                                                 // may cause us to end up dropping valid `channel_update`s if a peer is
273                                                 // malicious, but we should get the correct ones when the node updates them.
274                                                 *latest_update = Some(
275                                                         if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
276                                                         else { ChannelUpdate::Unsigned(msg.clone()) });
277                                         }
278                                         return Err(LightningError {
279                                                 err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
280                                                 action: ErrorAction::IgnoreAndLog(Level::Gossip),
281                                         });
282                                 },
283                                 None => { e.remove(); },
284                         }
285                 }
286                 Ok(())
287         }
288
289         /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
290         /// given node and, if so, stores the channel message for handling later and returns an `Err`.
291         pub(super) fn check_hold_pending_node_announcement(
292                 &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
293         ) -> Result<(), LightningError> {
294                 let mut pending_checks = self.internal.lock().unwrap();
295                 if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
296                         let mut found_at_least_one_chan = false;
297                         e.get_mut().retain(|node_msgs| {
298                                 match Weak::upgrade(&node_msgs) {
299                                         Some(chan_mtx) => {
300                                                 let mut chan_msgs = chan_mtx.lock().unwrap();
301                                                 if let Some(chan_announce) = &chan_msgs.channel_announce {
302                                                         let latest_announce =
303                                                                 if *chan_announce.node_id_1() == msg.node_id {
304                                                                         &mut chan_msgs.latest_node_announce_a
305                                                                 } else {
306                                                                         &mut chan_msgs.latest_node_announce_b
307                                                                 };
308                                                         if latest_announce.is_none() ||
309                                                                 latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
310                                                         {
311                                                                 // If the messages we got has a higher timestamp, just blindly
312                                                                 // assume the signatures on the new message are correct and drop
313                                                                 // the old message. This may cause us to end up dropping valid
314                                                                 // `node_announcement`s if a peer is malicious, but we should get
315                                                                 // the correct ones when the node updates them.
316                                                                 *latest_announce = Some(
317                                                                         if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
318                                                                         else { NodeAnnouncement::Unsigned(msg.clone()) });
319                                                         }
320                                                         found_at_least_one_chan = true;
321                                                         true
322                                                 } else {
323                                                         debug_assert!(false, "channel_announce is set before struct is added to node map");
324                                                         false
325                                                 }
326                                         },
327                                         None => false,
328                                 }
329                         });
330                         if e.get().is_empty() { e.remove(); }
331                         if found_at_least_one_chan {
332                                 return Err(LightningError {
333                                         err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
334                                         action: ErrorAction::IgnoreAndLog(Level::Gossip),
335                                 });
336                         }
337                 }
338                 Ok(())
339         }
340
341         fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
342                 full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
343                 pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
344         ) -> Result<(), msgs::LightningError> {
345                 match pending_channels.entry(msg.short_channel_id) {
346                         hash_map::Entry::Occupied(mut e) => {
347                                 // There's already a pending lookup for the given SCID. Check if the messages
348                                 // are the same and, if so, return immediately (don't bother spawning another
349                                 // lookup if we haven't gotten that far yet).
350                                 match Weak::upgrade(&e.get()) {
351                                         Some(pending_msgs) => {
352                                                 let pending_matches = match &pending_msgs.lock().unwrap().channel_announce {
353                                                         Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
354                                                         Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
355                                                         None => {
356                                                                 // This shouldn't actually be reachable. We set the
357                                                                 // `channel_announce` field under the same lock as setting the
358                                                                 // channel map entry. Still, we can just treat it as
359                                                                 // non-matching and let the new request fly.
360                                                                 debug_assert!(false);
361                                                                 false
362                                                         },
363                                                 };
364                                                 if pending_matches {
365                                                         return Err(LightningError {
366                                                                 err: "Channel announcement is already being checked".to_owned(),
367                                                                 action: ErrorAction::IgnoreDuplicateGossip,
368                                                         });
369                                                 } else {
370                                                         // The earlier lookup is a different message. If we have another
371                                                         // request in-flight now replace the original.
372                                                         // Note that in the replace case whether to replace is somewhat
373                                                         // arbitrary - both results will be handled, we're just updating the
374                                                         // value that will be compared to future lookups with the same SCID.
375                                                         if let Some(item) = replacement {
376                                                                 *e.get_mut() = item;
377                                                         }
378                                                 }
379                                         },
380                                         None => {
381                                                 // The earlier lookup already resolved. We can't be sure its the same
382                                                 // so just remove/replace it and move on.
383                                                 if let Some(item) = replacement {
384                                                         *e.get_mut() = item;
385                                                 } else { e.remove(); }
386                                         },
387                                 }
388                         },
389                         hash_map::Entry::Vacant(v) => {
390                                 if let Some(item) = replacement { v.insert(item); }
391                         },
392                 }
393                 Ok(())
394         }
395
396         pub(super) fn check_channel_announcement<U: Deref>(&self,
397                 utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
398                 full_msg: Option<&msgs::ChannelAnnouncement>
399         ) -> Result<Option<u64>, msgs::LightningError> where U::Target: UtxoLookup {
400                 let handle_result = |res| {
401                         match res {
402                                 Ok(TxOut { value, script_pubkey }) => {
403                                         let expected_script =
404                                                 make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh();
405                                         if script_pubkey != expected_script {
406                                                 return Err(LightningError{
407                                                         err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
408                                                                 expected_script.to_hex(), script_pubkey.to_hex()),
409                                                         action: ErrorAction::IgnoreError
410                                                 });
411                                         }
412                                         Ok(Some(value))
413                                 },
414                                 Err(UtxoLookupError::UnknownChain) => {
415                                         Err(LightningError {
416                                                 err: format!("Channel announced on an unknown chain ({})",
417                                                         msg.chain_hash.encode().to_hex()),
418                                                 action: ErrorAction::IgnoreError
419                                         })
420                                 },
421                                 Err(UtxoLookupError::UnknownTx) => {
422                                         Err(LightningError {
423                                                 err: "Channel announced without corresponding UTXO entry".to_owned(),
424                                                 action: ErrorAction::IgnoreError
425                                         })
426                                 },
427                         }
428                 };
429
430                 Self::check_replace_previous_entry(msg, full_msg, None,
431                         &mut self.internal.lock().unwrap().channels)?;
432
433                 match utxo_lookup {
434                         &None => {
435                                 // Tentatively accept, potentially exposing us to DoS attacks
436                                 Ok(None)
437                         },
438                         &Some(ref utxo_lookup) => {
439                                 match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
440                                         UtxoResult::Sync(res) => handle_result(res),
441                                         UtxoResult::Async(future) => {
442                                                 let mut pending_checks = self.internal.lock().unwrap();
443                                                 let mut async_messages = future.state.lock().unwrap();
444                                                 if let Some(res) = async_messages.complete.take() {
445                                                         // In the unlikely event the future resolved before we managed to get it,
446                                                         // handle the result in-line.
447                                                         handle_result(res)
448                                                 } else {
449                                                         Self::check_replace_previous_entry(msg, full_msg,
450                                                                 Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
451                                                         async_messages.channel_announce = Some(
452                                                                 if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
453                                                                 else { ChannelAnnouncement::Unsigned(msg.clone()) });
454                                                         pending_checks.nodes.entry(msg.node_id_1)
455                                                                 .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
456                                                         pending_checks.nodes.entry(msg.node_id_2)
457                                                                 .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
458                                                         Err(LightningError {
459                                                                 err: "Channel being checked async".to_owned(),
460                                                                 action: ErrorAction::IgnoreAndLog(Level::Gossip),
461                                                         })
462                                                 }
463                                         },
464                                 }
465                         }
466                 }
467         }
468 }