From 2cca65058e4ce3e0120a3fe78a14ed82d1c3a43b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 8 Feb 2023 22:06:11 +0000 Subject: [PATCH] Add an async resolution option to `ChainAccess::get_utxo` For those operating in an async environment, requiring `ChainAccess::get_utxo` return information about the requested UTXO synchronously is incredibly painful. Requesting information about a random UTXO is likely to go over the network, and likely to be a rather slow request. Thus, here, we change the return type of `get_utxo` to have both a synchronous and asynchronous form. The asynchronous form requires the user construct a `AccessFuture` which they `clone` and pass back to us. Internally, an `AccessFuture` has an `Arc` to the `channel_announcement` message which we need to process. When the user completes their lookup, they call `resolve` on their `AccessFuture` which we pull the `channel_announcement` from and then apply to the network graph. --- fuzz/src/router.rs | 48 +++++++--- lightning/src/routing/gossip.rs | 7 +- lightning/src/routing/utxo.rs | 145 ++++++++++++++++++++++++++++--- lightning/src/util/test_utils.rs | 8 +- 4 files changed, 180 insertions(+), 28 deletions(-) diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index 3d5c88bc3..a30c7d28a 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -15,7 +15,7 @@ use lightning::chain::transaction::OutPoint; use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty}; use lightning::ln::msgs; use lightning::routing::gossip::{NetworkGraph, RoutingFees}; -use lightning::routing::utxo::{UtxoLookup, UtxoLookupError}; +use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters}; use lightning::routing::scoring::FixedPenaltyScorer; use lightning::util::config::UserConfig; @@ -81,17 +81,36 @@ impl InputData { } } -struct FuzzChainSource { +struct FuzzChainSource<'a, 'b, Out: test_logger::Output> { input: Arc, + net_graph: &'a NetworkGraph<&'b test_logger::TestLogger>, } -impl UtxoLookup for FuzzChainSource { - fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { - match self.input.get_slice(2) { - Some(&[0, _]) => Err(UtxoLookupError::UnknownChain), - Some(&[1, _]) => Err(UtxoLookupError::UnknownTx), - Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }), - None => Err(UtxoLookupError::UnknownTx), - _ => unreachable!(), +impl UtxoLookup for FuzzChainSource<'_, '_, Out> { + fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { + let input_slice = self.input.get_slice(2); + if input_slice.is_none() { return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); } + let input_slice = input_slice.unwrap(); + let txo_res = TxOut { + value: if input_slice[0] % 2 == 0 { 1_000_000 } else { 1_000 }, + script_pubkey: Builder::new().push_int(input_slice[1] as i64).into_script().to_v0_p2wsh(), + }; + match input_slice { + &[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)), + &[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)), + &[2, _] => { + let future = UtxoFuture::new(); + future.resolve(self.net_graph, Ok(txo_res)); + UtxoResult::Async(future.clone()) + }, + &[3, _] => { + let future = UtxoFuture::new(); + future.resolve(self.net_graph, Err(UtxoLookupError::UnknownTx)); + UtxoResult::Async(future.clone()) + }, + &[4, _] => { + UtxoResult::Async(UtxoFuture::new()) // the future will never resolve + }, + &[..] => UtxoResult::Sync(Ok(txo_res)), } } } @@ -171,6 +190,10 @@ pub fn do_test(data: &[u8], out: Out) { let our_pubkey = get_pubkey!(); let net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), &logger); + let chain_source = FuzzChainSource { + input: Arc::clone(&input), + net_graph: &net_graph, + }; let mut node_pks = HashSet::new(); let mut scid = 42; @@ -191,13 +214,14 @@ pub fn do_test(data: &[u8], out: Out) { let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1)); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2)); - let _ = net_graph.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None); + let _ = net_graph.update_channel_from_unsigned_announcement:: + <&FuzzChainSource<'_, '_, Out>>(&msg, &None); }, 2 => { let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1)); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2)); - let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&FuzzChainSource { input: Arc::clone(&input) })); + let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&chain_source)); }, 3 => { let _ = net_graph.update_channel_unsigned(&decode_msg!(msgs::UnsignedChannelUpdate, 72)); diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index bfc301864..dd9cabd25 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -155,6 +155,8 @@ pub struct NetworkGraph where L::Target: Logger { /// resync them from gossip. Each `NodeId` is mapped to the time (in seconds) it was removed so /// that once some time passes, we can potentially resync it from gossip again. removed_nodes: Mutex>>, + /// Announcement messages which are awaiting an on-chain lookup to be processed. + pub(super) pending_checks: utxo::PendingChecks, } /// A read-only view of [`NetworkGraph`]. @@ -1200,6 +1202,7 @@ impl ReadableArgs for NetworkGraph where L::Target: Logger { last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp), removed_nodes: Mutex::new(HashMap::new()), removed_channels: Mutex::new(HashMap::new()), + pending_checks: utxo::PendingChecks::new(), }) } } @@ -1239,6 +1242,7 @@ impl NetworkGraph where L::Target: Logger { last_rapid_gossip_sync_timestamp: Mutex::new(None), removed_channels: Mutex::new(HashMap::new()), removed_nodes: Mutex::new(HashMap::new()), + pending_checks: utxo::PendingChecks::new(), } } @@ -1494,7 +1498,8 @@ impl NetworkGraph where L::Target: Logger { } } - let utxo_value = utxo::check_channel_announcement(utxo_lookup, msg)?; + let utxo_value = self.pending_checks.check_channel_announcement( + utxo_lookup, msg, full_msg)?; #[allow(unused_mut, unused_assignments)] let mut announcement_received_time = 0; diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index ee56f2d92..18c7ff97d 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -18,10 +18,14 @@ use bitcoin::hashes::hex::ToHex; use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::msgs::{self, LightningError, ErrorAction}; +use crate::routing::gossip::{NetworkGraph, NodeId}; +use crate::util::logger::{Level, Logger}; use crate::util::ser::Writeable; use crate::prelude::*; +use alloc::sync::{Arc, Weak}; +use crate::sync::Mutex; use core::ops::Deref; /// An error when accessing the chain via [`UtxoLookup`]. @@ -34,6 +38,23 @@ pub enum UtxoLookupError { UnknownTx, } +/// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously, +/// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async` +/// variant. +pub enum UtxoResult { + /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output + /// requested or a [`UtxoLookupError`]. + Sync(Result), + /// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of + /// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes. + /// + /// Note that in order to avoid runaway memory usage, the number of parallel checks is limited, + /// but only fairly loosely. Because a pending checks block all message processing, leaving + /// checks pending for an extended time may cause DoS of other functions. It is recommended you + /// keep a tight timeout on lookups, on the order of a few seconds. + Async(UtxoFuture), +} + /// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs. pub trait UtxoLookup { /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`]. @@ -41,19 +62,93 @@ pub trait UtxoLookup { /// is unknown. /// /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id - fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result; + fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult; +} + +enum ChannelAnnouncement { + Full(msgs::ChannelAnnouncement), + Unsigned(msgs::UnsignedChannelAnnouncement), +} + +struct UtxoMessages { + complete: Option>, + channel_announce: Option, +} + +/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async. +/// +/// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info. +#[derive(Clone)] +pub struct UtxoFuture { + state: Arc>, +} + +/// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph +/// once we have a concrete resolution of a request. +struct UtxoResolver(Result); +impl UtxoLookup for UtxoResolver { + fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { + UtxoResult::Sync(self.0.clone()) + } +} + +impl UtxoFuture { + /// Builds a new future for later resolution. + pub fn new() -> Self { + Self { state: Arc::new(Mutex::new(UtxoMessages { + complete: None, + channel_announce: None, + }))} + } + + /// Resolves this future against the given `graph` and with the given `result`. + pub fn resolve(&self, graph: &NetworkGraph, result: Result) + where L::Target: Logger { + let announcement = { + let mut async_messages = self.state.lock().unwrap(); + + if async_messages.channel_announce.is_none() { + // We raced returning to `check_channel_announcement` which hasn't updated + // `channel_announce` yet. That's okay, we can set the `complete` field which it will + // check once it gets control again. + async_messages.complete = Some(result); + return; + } + + async_messages.channel_announce.take().unwrap() + }; + + // Now that we've updated our internal state, pass the pending messages back through the + // network graph with a different `UtxoLookup` which will resolve immediately. + // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do + // with them. + let resolver = UtxoResolver(result); + match announcement { + ChannelAnnouncement::Full(signed_msg) => { + let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)); + }, + ChannelAnnouncement::Unsigned(msg) => { + let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); + }, + } + } +} + +/// A set of messages which are pending UTXO lookups for processing. +pub(super) struct PendingChecks { } -pub(crate) fn check_channel_announcement( - utxo_lookup: &Option, msg: &msgs::UnsignedChannelAnnouncement -) -> Result, msgs::LightningError> where U::Target: UtxoLookup { - match utxo_lookup { - &None => { - // Tentatively accept, potentially exposing us to DoS attacks - Ok(None) - }, - &Some(ref utxo_lookup) => { - match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { +impl PendingChecks { + pub(super) fn new() -> Self { + PendingChecks {} + } + + pub(super) fn check_channel_announcement(&self, + utxo_lookup: &Option, msg: &msgs::UnsignedChannelAnnouncement, + full_msg: Option<&msgs::ChannelAnnouncement> + ) -> Result, msgs::LightningError> where U::Target: UtxoLookup { + let handle_result = |res| { + match res { Ok(TxOut { value, script_pubkey }) => { let expected_script = make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh(); @@ -80,6 +175,34 @@ pub(crate) fn check_channel_announcement( }) }, } + }; + + match utxo_lookup { + &None => { + // Tentatively accept, potentially exposing us to DoS attacks + Ok(None) + }, + &Some(ref utxo_lookup) => { + match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { + UtxoResult::Sync(res) => handle_result(res), + UtxoResult::Async(future) => { + let mut async_messages = future.state.lock().unwrap(); + if let Some(res) = async_messages.complete.take() { + // In the unlikely event the future resolved before we managed to get it, + // handle the result in-line. + handle_result(res) + } else { + async_messages.channel_announce = Some( + if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) } + else { ChannelAnnouncement::Unsigned(msg.clone()) }); + Err(LightningError { + err: "Channel being checked async".to_owned(), + action: ErrorAction::IgnoreAndLog(Level::Gossip), + }) + } + }, + } + } } } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 773242313..b62479555 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -23,7 +23,7 @@ use crate::ln::{msgs, wire}; use crate::ln::msgs::LightningError; use crate::ln::script::ShutdownScript; use crate::routing::gossip::{NetworkGraph, NodeId}; -use crate::routing::utxo::{UtxoLookup, UtxoLookupError}; +use crate::routing::utxo::{UtxoLookup, UtxoLookupError, UtxoResult}; use crate::routing::router::{find_route, InFlightHtlcs, Route, RouteHop, RouteParameters, Router, ScorerAccountingForInFlightHtlcs}; use crate::routing::scoring::FixedPenaltyScorer; use crate::util::config::UserConfig; @@ -857,12 +857,12 @@ impl TestChainSource { } impl UtxoLookup for TestChainSource { - fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { + fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { if self.genesis_hash != *genesis_hash { - return Err(UtxoLookupError::UnknownChain); + return UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)); } - self.utxo_ret.lock().unwrap().clone() + UtxoResult::Sync(self.utxo_ret.lock().unwrap().clone()) } } -- 2.39.5