Add an async resolution option to `ChainAccess::get_utxo`
authorMatt Corallo <git@bluematt.me>
Wed, 8 Feb 2023 22:06:11 +0000 (22:06 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 8 Feb 2023 23:54:30 +0000 (23:54 +0000)
For those operating in an async environment, requiring
`ChainAccess::get_utxo` return information about the requested UTXO
synchronously is incredibly painful. Requesting information about a
random UTXO is likely to go over the network, and likely to be a
rather slow request.

Thus, here, we change the return type of `get_utxo` to have both a
synchronous and asynchronous form. The asynchronous form requires
the user construct a `AccessFuture` which they `clone` and pass
back to us. Internally, an `AccessFuture` has an `Arc` to the
`channel_announcement` message which we need to process. When the
user completes their lookup, they call `resolve` on their
`AccessFuture` which we pull the `channel_announcement` from and
then apply to the network graph.

fuzz/src/router.rs
lightning/src/routing/gossip.rs
lightning/src/routing/utxo.rs
lightning/src/util/test_utils.rs

index 3d5c88bc3da926c6d7e6434b74c197202df1373f..a30c7d28af4931230a9384f71ece64afdc8b6429 100644 (file)
@@ -15,7 +15,7 @@ use lightning::chain::transaction::OutPoint;
 use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty};
 use lightning::ln::msgs;
 use lightning::routing::gossip::{NetworkGraph, RoutingFees};
-use lightning::routing::utxo::{UtxoLookup, UtxoLookupError};
+use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};
 use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters};
 use lightning::routing::scoring::FixedPenaltyScorer;
 use lightning::util::config::UserConfig;
@@ -81,17 +81,36 @@ impl InputData {
        }
 }
 
-struct FuzzChainSource {
+struct FuzzChainSource<'a, 'b, Out: test_logger::Output> {
        input: Arc<InputData>,
+       net_graph: &'a NetworkGraph<&'b test_logger::TestLogger<Out>>,
 }
-impl UtxoLookup for FuzzChainSource {
-       fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
-               match self.input.get_slice(2) {
-                       Some(&[0, _]) => Err(UtxoLookupError::UnknownChain),
-                       Some(&[1, _]) => Err(UtxoLookupError::UnknownTx),
-                       Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }),
-                       None => Err(UtxoLookupError::UnknownTx),
-                       _ => unreachable!(),
+impl<Out: test_logger::Output> UtxoLookup for FuzzChainSource<'_, '_, Out> {
+       fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
+               let input_slice = self.input.get_slice(2);
+               if input_slice.is_none() { return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); }
+               let input_slice = input_slice.unwrap();
+               let txo_res = TxOut {
+                       value: if input_slice[0] % 2 == 0 { 1_000_000 } else { 1_000 },
+                       script_pubkey: Builder::new().push_int(input_slice[1] as i64).into_script().to_v0_p2wsh(),
+               };
+               match input_slice {
+                       &[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)),
+                       &[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)),
+                       &[2, _] => {
+                               let future = UtxoFuture::new();
+                               future.resolve(self.net_graph, Ok(txo_res));
+                               UtxoResult::Async(future.clone())
+                       },
+                       &[3, _] => {
+                               let future = UtxoFuture::new();
+                               future.resolve(self.net_graph, Err(UtxoLookupError::UnknownTx));
+                               UtxoResult::Async(future.clone())
+                       },
+                       &[4, _] => {
+                               UtxoResult::Async(UtxoFuture::new()) // the future will never resolve
+                       },
+                       &[..] => UtxoResult::Sync(Ok(txo_res)),
                }
        }
 }
@@ -171,6 +190,10 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
 
        let our_pubkey = get_pubkey!();
        let net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), &logger);
+       let chain_source = FuzzChainSource {
+               input: Arc::clone(&input),
+               net_graph: &net_graph,
+       };
 
        let mut node_pks = HashSet::new();
        let mut scid = 42;
@@ -191,13 +214,14 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                                let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
                                node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1));
                                node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2));
-                               let _ = net_graph.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None);
+                               let _ = net_graph.update_channel_from_unsigned_announcement::
+                                       <&FuzzChainSource<'_, '_, Out>>(&msg, &None);
                        },
                        2 => {
                                let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
                                node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1));
                                node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2));
-                               let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&FuzzChainSource { input: Arc::clone(&input) }));
+                               let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&chain_source));
                        },
                        3 => {
                                let _ = net_graph.update_channel_unsigned(&decode_msg!(msgs::UnsignedChannelUpdate, 72));
index bfc3018643bf2711d06bdbcb4a711ed03dde147b..dd9cabd25eba50d8aba05e267c6900cb073fb28d 100644 (file)
@@ -155,6 +155,8 @@ pub struct NetworkGraph<L: Deref> where L::Target: Logger {
        /// resync them from gossip. Each `NodeId` is mapped to the time (in seconds) it was removed so
        /// that once some time passes, we can potentially resync it from gossip again.
        removed_nodes: Mutex<HashMap<NodeId, Option<u64>>>,
+       /// Announcement messages which are awaiting an on-chain lookup to be processed.
+       pub(super) pending_checks: utxo::PendingChecks,
 }
 
 /// A read-only view of [`NetworkGraph`].
@@ -1200,6 +1202,7 @@ impl<L: Deref> ReadableArgs<L> for NetworkGraph<L> where L::Target: Logger {
                        last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp),
                        removed_nodes: Mutex::new(HashMap::new()),
                        removed_channels: Mutex::new(HashMap::new()),
+                       pending_checks: utxo::PendingChecks::new(),
                })
        }
 }
@@ -1239,6 +1242,7 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
                        last_rapid_gossip_sync_timestamp: Mutex::new(None),
                        removed_channels: Mutex::new(HashMap::new()),
                        removed_nodes: Mutex::new(HashMap::new()),
+                       pending_checks: utxo::PendingChecks::new(),
                }
        }
 
@@ -1494,7 +1498,8 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
                        }
                }
 
-               let utxo_value = utxo::check_channel_announcement(utxo_lookup, msg)?;
+               let utxo_value = self.pending_checks.check_channel_announcement(
+                       utxo_lookup, msg, full_msg)?;
 
                #[allow(unused_mut, unused_assignments)]
                let mut announcement_received_time = 0;
index ee56f2d928858aa8775a1f85d2211fbbc43ee094..18c7ff97d94508093507c8684bf537d5b5b71bf0 100644 (file)
@@ -18,10 +18,14 @@ use bitcoin::hashes::hex::ToHex;
 
 use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
 use crate::ln::msgs::{self, LightningError, ErrorAction};
+use crate::routing::gossip::{NetworkGraph, NodeId};
+use crate::util::logger::{Level, Logger};
 use crate::util::ser::Writeable;
 
 use crate::prelude::*;
 
+use alloc::sync::{Arc, Weak};
+use crate::sync::Mutex;
 use core::ops::Deref;
 
 /// An error when accessing the chain via [`UtxoLookup`].
@@ -34,6 +38,23 @@ pub enum UtxoLookupError {
        UnknownTx,
 }
 
+/// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
+/// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
+/// variant.
+pub enum UtxoResult {
+       /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
+       /// requested or a [`UtxoLookupError`].
+       Sync(Result<TxOut, UtxoLookupError>),
+       /// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of
+       /// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes.
+       ///
+       /// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
+       /// but only fairly loosely. Because a pending checks block all message processing, leaving
+       /// checks pending for an extended time may cause DoS of other functions. It is recommended you
+       /// keep a tight timeout on lookups, on the order of a few seconds.
+       Async(UtxoFuture),
+}
+
 /// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs.
 pub trait UtxoLookup {
        /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
@@ -41,19 +62,93 @@ pub trait UtxoLookup {
        /// is unknown.
        ///
        /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
-       fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, UtxoLookupError>;
+       fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult;
+}
+
+enum ChannelAnnouncement {
+       Full(msgs::ChannelAnnouncement),
+       Unsigned(msgs::UnsignedChannelAnnouncement),
+}
+
+struct UtxoMessages {
+       complete: Option<Result<TxOut, UtxoLookupError>>,
+       channel_announce: Option<ChannelAnnouncement>,
+}
+
+/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
+///
+/// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info.
+#[derive(Clone)]
+pub struct UtxoFuture {
+       state: Arc<Mutex<UtxoMessages>>,
+}
+
+/// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph
+/// once we have a concrete resolution of a request.
+struct UtxoResolver(Result<TxOut, UtxoLookupError>);
+impl UtxoLookup for UtxoResolver {
+       fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
+               UtxoResult::Sync(self.0.clone())
+       }
+}
+
+impl UtxoFuture {
+       /// Builds a new future for later resolution.
+       pub fn new() -> Self {
+               Self { state: Arc::new(Mutex::new(UtxoMessages {
+                       complete: None,
+                       channel_announce: None,
+               }))}
+       }
+
+       /// Resolves this future against the given `graph` and with the given `result`.
+       pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
+       where L::Target: Logger {
+               let announcement = {
+                       let mut async_messages = self.state.lock().unwrap();
+
+                       if async_messages.channel_announce.is_none() {
+                               // We raced returning to `check_channel_announcement` which hasn't updated
+                               // `channel_announce` yet. That's okay, we can set the `complete` field which it will
+                               // check once it gets control again.
+                               async_messages.complete = Some(result);
+                               return;
+                       }
+
+                       async_messages.channel_announce.take().unwrap()
+               };
+
+               // Now that we've updated our internal state, pass the pending messages back through the
+               // network graph with a different `UtxoLookup` which will resolve immediately.
+               // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
+               // with them.
+               let resolver = UtxoResolver(result);
+               match announcement {
+                       ChannelAnnouncement::Full(signed_msg) => {
+                               let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver));
+                       },
+                       ChannelAnnouncement::Unsigned(msg) => {
+                               let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
+                       },
+               }
+       }
+}
+
+/// A set of messages which are pending UTXO lookups for processing.
+pub(super) struct PendingChecks {
 }
 
-pub(crate) fn check_channel_announcement<U: Deref>(
-       utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement
-) -> Result<Option<u64>, msgs::LightningError> where U::Target: UtxoLookup {
-       match utxo_lookup {
-               &None => {
-                       // Tentatively accept, potentially exposing us to DoS attacks
-                       Ok(None)
-               },
-               &Some(ref utxo_lookup) => {
-                       match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
+impl PendingChecks {
+       pub(super) fn new() -> Self {
+               PendingChecks {}
+       }
+
+       pub(super) fn check_channel_announcement<U: Deref>(&self,
+               utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
+               full_msg: Option<&msgs::ChannelAnnouncement>
+       ) -> Result<Option<u64>, msgs::LightningError> where U::Target: UtxoLookup {
+               let handle_result = |res| {
+                       match res {
                                Ok(TxOut { value, script_pubkey }) => {
                                        let expected_script =
                                                make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh();
@@ -80,6 +175,34 @@ pub(crate) fn check_channel_announcement<U: Deref>(
                                        })
                                },
                        }
+               };
+
+               match utxo_lookup {
+                       &None => {
+                               // Tentatively accept, potentially exposing us to DoS attacks
+                               Ok(None)
+                       },
+                       &Some(ref utxo_lookup) => {
+                               match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
+                                       UtxoResult::Sync(res) => handle_result(res),
+                                       UtxoResult::Async(future) => {
+                                               let mut async_messages = future.state.lock().unwrap();
+                                               if let Some(res) = async_messages.complete.take() {
+                                                       // In the unlikely event the future resolved before we managed to get it,
+                                                       // handle the result in-line.
+                                                       handle_result(res)
+                                               } else {
+                                                       async_messages.channel_announce = Some(
+                                                               if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
+                                                               else { ChannelAnnouncement::Unsigned(msg.clone()) });
+                                                       Err(LightningError {
+                                                               err: "Channel being checked async".to_owned(),
+                                                               action: ErrorAction::IgnoreAndLog(Level::Gossip),
+                                                       })
+                                               }
+                                       },
+                               }
+                       }
                }
        }
 }
index 773242313933c867badfdb90e39df82f96250338..b62479555ccea4fd4064b175f4e24974dc696372 100644 (file)
@@ -23,7 +23,7 @@ use crate::ln::{msgs, wire};
 use crate::ln::msgs::LightningError;
 use crate::ln::script::ShutdownScript;
 use crate::routing::gossip::{NetworkGraph, NodeId};
-use crate::routing::utxo::{UtxoLookup, UtxoLookupError};
+use crate::routing::utxo::{UtxoLookup, UtxoLookupError, UtxoResult};
 use crate::routing::router::{find_route, InFlightHtlcs, Route, RouteHop, RouteParameters, Router, ScorerAccountingForInFlightHtlcs};
 use crate::routing::scoring::FixedPenaltyScorer;
 use crate::util::config::UserConfig;
@@ -857,12 +857,12 @@ impl TestChainSource {
 }
 
 impl UtxoLookup for TestChainSource {
-       fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
+       fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
                if self.genesis_hash != *genesis_hash {
-                       return Err(UtxoLookupError::UnknownChain);
+                       return UtxoResult::Sync(Err(UtxoLookupError::UnknownChain));
                }
 
-               self.utxo_ret.lock().unwrap().clone()
+               UtxoResult::Sync(self.utxo_ret.lock().unwrap().clone())
        }
 }