X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Frouting%2Futxo.rs;h=020993f23fb809c250084afe78ec9fad19fbbaad;hb=be4bb58573ab6f56f363496574d64fa83074364b;hp=56b671845894eaefe4f0bd6f7fe3e869a3ec1cb3;hpb=67c9c7f2ae150a287370d56373f673e116172690;p=rust-lightning diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 56b67184..020993f2 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -18,7 +18,8 @@ use bitcoin::hashes::hex::ToHex; use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::msgs::{self, LightningError, ErrorAction}; -use crate::routing::gossip::{NetworkGraph, NodeId}; +use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use crate::util::events::MessageSendEvent; use crate::util::logger::{Level, Logger}; use crate::util::ser::Writeable; @@ -41,6 +42,7 @@ pub enum UtxoLookupError { /// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously, /// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async` /// variant. +#[derive(Clone)] pub enum UtxoResult { /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output /// requested or a [`UtxoLookupError`]. @@ -144,8 +146,46 @@ impl UtxoFuture { } /// Resolves this future against the given `graph` and with the given `result`. - pub fn resolve(&self, graph: &NetworkGraph, result: Result) + /// + /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling + /// forwarding the validated gossip message onwards to peers. + /// + /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order + /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] + /// after this. + /// + /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high + /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events + pub fn resolve_without_forwarding(&self, + graph: &NetworkGraph, result: Result) where L::Target: Logger { + self.do_resolve(graph, result); + } + + /// Resolves this future against the given `graph` and with the given `result`. + /// + /// The given `gossip` is used to broadcast any validated messages onwards to all peers which + /// have available buffer space. + /// + /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order + /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] + /// after this. + /// + /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high + /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events + pub fn resolve>, U: Deref, GS: Deref>>(&self, + graph: &NetworkGraph, gossip: GS, result: Result + ) where L::Target: Logger, U::Target: UtxoLookup { + let mut res = self.do_resolve(graph, result); + for msg_opt in res.iter_mut() { + if let Some(msg) = msg_opt.take() { + gossip.forward_gossip_msg(msg); + } + } + } + + fn do_resolve(&self, graph: &NetworkGraph, result: Result) + -> [Option; 5] where L::Target: Logger { let (announcement, node_a, node_b, update_a, update_b) = { let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); let mut async_messages = self.state.lock().unwrap(); @@ -155,7 +195,7 @@ impl UtxoFuture { // `channel_announce` yet. That's okay, we can set the `complete` field which it will // check once it gets control again. async_messages.complete = Some(result); - return; + return [None, None, None, None, None]; } let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { @@ -172,6 +212,9 @@ impl UtxoFuture { async_messages.latest_channel_update_b.take()) }; + let mut res = [None, None, None, None, None]; + let mut res_idx = 0; + // Now that we've updated our internal state, pass the pending messages back through the // network graph with a different `UtxoLookup` which will resolve immediately. // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do @@ -179,7 +222,12 @@ impl UtxoFuture { let resolver = UtxoResolver(result); match announcement { ChannelAnnouncement::Full(signed_msg) => { - let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)); + if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement { + msg: signed_msg, update_msg: None, + }); + res_idx += 1; + } }, ChannelAnnouncement::Unsigned(msg) => { let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); @@ -189,7 +237,12 @@ impl UtxoFuture { for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) { match announce { Some(NodeAnnouncement::Full(signed_msg)) => { - let _ = graph.update_node_from_announcement(&signed_msg); + if graph.update_node_from_announcement(&signed_msg).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement { + msg: signed_msg, + }); + res_idx += 1; + } }, Some(NodeAnnouncement::Unsigned(msg)) => { let _ = graph.update_node_from_unsigned_announcement(&msg); @@ -201,7 +254,12 @@ impl UtxoFuture { for update in core::iter::once(update_a).chain(core::iter::once(update_b)) { match update { Some(ChannelUpdate::Full(signed_msg)) => { - let _ = graph.update_channel(&signed_msg); + if graph.update_channel(&signed_msg).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate { + msg: signed_msg, + }); + res_idx += 1; + } }, Some(ChannelUpdate::Unsigned(msg)) => { let _ = graph.update_channel_unsigned(&msg); @@ -209,6 +267,8 @@ impl UtxoFuture { None => {}, } } + + res } } @@ -308,11 +368,6 @@ impl PendingChecks { if latest_announce.is_none() || latest_announce.as_ref().unwrap().timestamp() < msg.timestamp { - // If the messages we got has a higher timestamp, just blindly - // assume the signatures on the new message are correct and drop - // the old message. This may cause us to end up dropping valid - // `node_announcement`s if a peer is malicious, but we should get - // the correct ones when the node updates them. *latest_announce = Some( if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) } else { NodeAnnouncement::Unsigned(msg.clone()) }); @@ -465,4 +520,342 @@ impl PendingChecks { } } } + + /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`] + /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending - + /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s + /// which we'll have to process. With a socket buffer of 4KB and a minimum + /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer + /// count` messages to process beyond this limit. Because we'll probably have a few peers, + /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight + /// checks should be more than enough for decent parallelism. + const MAX_PENDING_LOOKUPS: usize = 32; + + /// Returns true if there are a large number of async checks pending and future + /// `channel_announcement` messages should be delayed. Note that this is only a hint and + /// messages already in-flight may still have to be handled for various reasons. + pub(super) fn too_many_checks_pending(&self) -> bool { + let mut pending_checks = self.internal.lock().unwrap(); + if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS { + // If we have many channel checks pending, ensure we don't have any dangling checks + // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture` + // instead) before we commit to applying backpressure. + pending_checks.channels.retain(|_, chan| { + Weak::upgrade(&chan).is_some() + }); + pending_checks.nodes.retain(|_, channels| { + channels.retain(|chan| Weak::upgrade(&chan).is_some()); + !channels.is_empty() + }); + pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::routing::gossip::tests::*; + use crate::util::test_utils::{TestChainSource, TestLogger}; + use crate::ln::msgs; + + use bitcoin::blockdata::constants::genesis_block; + use bitcoin::secp256k1::{Secp256k1, SecretKey}; + + use core::sync::atomic::Ordering; + + fn get_network() -> (TestChainSource, NetworkGraph>) { + let logger = Box::new(TestLogger::new()); + let genesis_hash = genesis_block(bitcoin::Network::Testnet).header.block_hash(); + let chain_source = TestChainSource::new(bitcoin::Network::Testnet); + let network_graph = NetworkGraph::new(genesis_hash, logger); + + (chain_source, network_graph) + } + + fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource, + NetworkGraph>, bitcoin::Script, msgs::NodeAnnouncement, + msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate) + { + let secp_ctx = Secp256k1::new(); + + let (chain_source, network_graph) = get_network(); + + let good_script = get_channel_script(&secp_ctx); + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + + let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx); + let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx); + + // Note that we have to set the "direction" flag correctly on both messages + let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx); + let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx); + let chan_update_c = get_signed_channel_update(|msg| { + msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx); + + (valid_announcement, chain_source, network_graph, good_script, node_a_announce, + node_b_announce, chan_update_a, chan_update_b, chan_update_c) + } + + #[test] + fn test_fast_async_lookup() { + // Check that async lookups which resolve quicker than the future is returned to the + // `get_utxo` call can read it still resolve properly. + let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap(); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some()); + } + + #[test] + fn test_async_lookup() { + // Test a simple async lookup + let (valid_announcement, chain_source, network_graph, good_script, + node_a_announce, node_b_announce, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 0, script_pubkey: good_script })); + network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); + network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_none()); + + network_graph.update_node_from_announcement(&node_a_announce).unwrap(); + network_graph.update_node_from_announcement(&node_b_announce).unwrap(); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_some()); + } + + #[test] + fn test_invalid_async_lookup() { + // Test an async lookup which returns an incorrect script + let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::Script::new() })); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + } + + #[test] + fn test_failing_async_lookup() { + // Test an async lookup which returns an error + let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + } + + #[test] + fn test_updates_async_lookup() { + // Test async lookups will process pending channel_update/node_announcements once they + // complete. + let (valid_announcement, chain_source, network_graph, good_script, node_a_announce, + node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + assert_eq!( + network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err, + "Awaiting channel_announcement validation to accept node_announcement"); + assert_eq!( + network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err, + "Awaiting channel_announcement validation to accept node_announcement"); + + assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some()); + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some()); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_some()); + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2) + .unwrap().announcement_info.is_some()); + } + + #[test] + fn test_latest_update_async_lookup() { + // Test async lookups will process the latest channel_update if two are received while + // awaiting an async UTXO lookup. + let (valid_announcement, chain_source, network_graph, good_script, _, + _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + + assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp); + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() + .one_to_two.as_ref().unwrap().last_update != + network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() + .two_to_one.as_ref().unwrap().last_update); + } + + #[test] + fn test_no_double_lookups() { + // Test that a pending async lookup will prevent a second async lookup from flying, but + // only if the channel_announcement message is identical. + let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); + + // If we make a second request with the same message, the call count doesn't increase... + let future_b = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone()); + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel announcement is already being checked"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); + + // But if we make a third request with a tweaked message, we should get a second call + // against our new future... + let secp_ctx = Secp256k1::new(); + let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap(); + let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap(); + let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx); + assert_eq!( + network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2); + + // Still, if we resolve the original future, the original channel will be accepted. + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + assert!(!network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap() + .announcement_message.as_ref().unwrap() + .contents.features.supports_unknown_test_feature()); + } + + #[test] + fn test_checks_backpressure() { + // Test that too_many_checks_pending returns true when there are many checks pending, and + // returns false once they complete. + let secp_ctx = Secp256k1::new(); + let (chain_source, network_graph) = get_network(); + + // We cheat and use a single future for all the lookups to complete them all at once. + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + + for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { + let valid_announcement = get_signed_channel_announcement( + |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + let valid_announcement = get_signed_channel_announcement( + |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(network_graph.pending_checks.too_many_checks_pending()); + + // Once the future completes the "too many checks" flag should reset. + future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + #[test] + fn test_checks_backpressure_drop() { + // Test that too_many_checks_pending returns true when there are many checks pending, and + // returns false if we drop some of the futures without completion. + let secp_ctx = Secp256k1::new(); + let (chain_source, network_graph) = get_network(); + + // We cheat and use a single future for all the lookups to complete them all at once. + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new()); + + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + + for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { + let valid_announcement = get_signed_channel_announcement( + |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + let valid_announcement = get_signed_channel_announcement( + |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(network_graph.pending_checks.too_many_checks_pending()); + + // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag + // should reset to false. + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } }