X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=0aa2a85529e7f5b75560e77aee099e0db3f7463e;hb=fb140b55c73171ad91992e39563e970791b66855;hp=2735aac5c51ac1df9dd6e1954aca95d2e739d8f4;hpb=785bdb84cb3fb0305cad2edc30e2df03b6988c9b;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 2735aac5..0aa2a855 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -46,7 +46,7 @@ use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, No use crate::ln::features::InvoiceFeatures; use crate::routing::gossip::NetworkGraph; use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router}; -use crate::routing::scoring::ProbabilisticScorer; +use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use crate::ln::msgs; use crate::ln::onion_utils; use crate::ln::onion_utils::HTLCFailReason; @@ -532,13 +532,31 @@ pub(crate) enum MonitorUpdateCompletionAction { /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate /// event can be generated. PaymentClaimed { payment_hash: PaymentHash }, - /// Indicates an [`events::Event`] should be surfaced to the user. - EmitEvent { event: events::Event }, + /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the + /// operation of another channel. + /// + /// This is usually generated when we've forwarded an HTLC and want to block the outbound edge + /// from completing a monitor update which removes the payment preimage until the inbound edge + /// completes a monitor update containing the payment preimage. In that case, after the inbound + /// edge completes, we will surface an [`Event::PaymentForwarded`] as well as unblock the + /// outbound edge. + EmitEventAndFreeOtherChannel { + event: events::Event, + downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>, + }, } impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (0, PaymentClaimed) => { (0, payment_hash, required) }, - (2, EmitEvent) => { (0, event, upgradable_required) }, + (2, EmitEventAndFreeOtherChannel) => { + (0, event, upgradable_required), + // LDK prior to 0.0.116 did not have this field as the monitor update application order was + // required by clients. If we downgrade to something prior to 0.0.116 this may result in + // monitor updates which aren't properly blocked or resumed, however that's fine - we don't + // support async monitor updates even in LDK 0.0.116 and once we do we'll require no + // downgrades to prior versions. + (1, downstream_counterparty_and_funding_outpoint, option), + }, ); #[derive(Clone, Debug, PartialEq, Eq)] @@ -555,6 +573,36 @@ impl_writeable_tlv_based_enum!(EventCompletionAction, }; ); +#[derive(Clone, PartialEq, Eq, Debug)] +/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track +/// the blocked action here. See enum variants for more info. +pub(crate) enum RAAMonitorUpdateBlockingAction { + /// A forwarded payment was claimed. We block the downstream channel completing its monitor + /// update which removes the HTLC preimage until the upstream channel has gotten the preimage + /// durably to disk. + ForwardedPaymentInboundClaim { + /// The upstream channel ID (i.e. the inbound edge). + channel_id: [u8; 32], + /// The HTLC ID on the inbound edge. + htlc_id: u64, + }, +} + +impl RAAMonitorUpdateBlockingAction { + #[allow(unused)] + fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self { + Self::ForwardedPaymentInboundClaim { + channel_id: prev_hop.outpoint.to_channel_id(), + htlc_id: prev_hop.htlc_id, + } + } +} + +impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction, + (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) } +;); + + /// State we hold per-peer. pub(super) struct PeerState { /// `temporary_channel_id` or `channel_id` -> `channel`. @@ -583,6 +631,11 @@ pub(super) struct PeerState { /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure /// duplicates do not occur, so such channels should fail without a monitor update completing. monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec>, + /// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have + /// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update + /// will remove a preimage that needs to be durably in an upstream channel first), we put an + /// entry here to note that the channel with the key's ID is blocked on a set of actions. + actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec>, /// The peer is currently connected (i.e. we've seen a /// [`ChannelMessageHandler::peer_connected`] and no corresponding /// [`ChannelMessageHandler::peer_disconnected`]. @@ -642,7 +695,9 @@ pub type SimpleArcChannelManager = ChannelManager< Arc>>, Arc, - Arc>>, Arc>>> + Arc>>, Arc>>>, + ProbabilisticScoringFeeParameters, + ProbabilisticScorer>>, Arc>, >>, Arc >; @@ -658,7 +713,7 @@ pub type SimpleArcChannelManager = ChannelManager< /// of [`KeysManager`] and [`DefaultRouter`]. /// /// This is not exported to bindings users as Arcs don't make sense in bindings -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>>, &'g L>; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>; macro_rules! define_test_pub_trait { ($vis: vis) => { /// A trivial trait which describes any [`ChannelManager`] used in testing. @@ -2785,10 +2840,9 @@ where let onion_keys = onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv) .map_err(|_| APIError::InvalidRoute{err: "Pubkey along hop was maliciously selected".to_owned()})?; let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(path, total_value, recipient_onion, cur_height, keysend_preimage)?; - if onion_utils::route_size_insane(&onion_payloads) { - return Err(APIError::InvalidRoute{err: "Route size too large considering onion data".to_owned()}); - } - let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash); + + let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash) + .map_err(|_| APIError::InvalidRoute { err: "Route size too large considering onion data".to_owned()})?; let err: Result<(), _> = loop { let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.hops.first().unwrap().short_channel_id) { @@ -4000,6 +4054,20 @@ where chan.maybe_expire_prev_config(); + if chan.should_disconnect_peer_awaiting_response() { + log_debug!(self.logger, "Disconnecting peer {} due to not making any progress on channel {}", + counterparty_node_id, log_bytes!(*chan_id)); + pending_msg_events.push(MessageSendEvent::HandleError { + node_id: counterparty_node_id, + action: msgs::ErrorAction::DisconnectPeerWithWarning { + msg: msgs::WarningMessage { + channel_id: *chan_id, + data: "Disconnecting due to timeout awaiting response".to_owned(), + }, + }, + }); + } + true }); if peer_state.ok_to_remove(true) { @@ -4490,16 +4558,16 @@ where Some(claimed_htlc_value - forwarded_htlc_value) } else { None }; - let prev_channel_id = Some(prev_outpoint.to_channel_id()); - let next_channel_id = Some(next_channel_id); - - Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded { - fee_earned_msat, - claim_from_onchain_tx: from_onchain, - prev_channel_id, - next_channel_id, - outbound_amount_forwarded_msat: forwarded_htlc_value_msat, - }}) + Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + event: events::Event::PaymentForwarded { + fee_earned_msat, + claim_from_onchain_tx: from_onchain, + prev_channel_id: Some(prev_outpoint.to_channel_id()), + next_channel_id: Some(next_channel_id), + outbound_amount_forwarded_msat: forwarded_htlc_value_msat, + }, + downstream_counterparty_and_funding_outpoint: None, + }) } else { None } }); if let Err((pk, err)) = res { @@ -4526,8 +4594,13 @@ where }, None)); } }, - MonitorUpdateCompletionAction::EmitEvent { event } => { + MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + event, downstream_counterparty_and_funding_outpoint + } => { self.pending_events.lock().unwrap().push_back((event, None)); + if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint { + self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker)); + } }, } } @@ -4596,7 +4669,7 @@ where if let Some(tx) = funding_broadcastable { log_info!(self.logger, "Broadcasting funding transaction with txid {}", tx.txid()); - self.tx_broadcaster.broadcast_transaction(&tx); + self.tx_broadcaster.broadcast_transactions(&[&tx]); } { @@ -5126,7 +5199,7 @@ where }; if let Some(broadcast_tx) = tx { log_info!(self.logger, "Broadcasting {}", log_tx!(broadcast_tx)); - self.tx_broadcaster.broadcast_transaction(&broadcast_tx); + self.tx_broadcaster.broadcast_transactions(&[&broadcast_tx]); } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { @@ -5374,6 +5447,24 @@ where } } + /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote + /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event + /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of + /// the [`ChannelMonitorUpdate`] in question. + fn raa_monitor_updates_held(&self, + actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec>, + channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey + ) -> bool { + actions_blocking_raa_monitor_updates + .get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false) + || self.pending_events.lock().unwrap().iter().any(|(_, action)| { + action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate { + channel_funding_outpoint, + counterparty_node_id, + }) + }) + } + fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { let (htlcs_to_fail, res) = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -5731,7 +5822,7 @@ where self.issue_channel_close_events(chan, ClosureReason::CooperativeClosure); log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); - self.tx_broadcaster.broadcast_transaction(&tx); + self.tx_broadcaster.broadcast_transactions(&[&tx]); update_maps_on_chan_removal!(self, chan); false } else { true } @@ -6038,25 +6129,37 @@ where self.pending_outbound_payments.clear_pending_payments() } - fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) { + /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an + /// [`Event`] being handled) completes, this should be called to restore the channel to normal + /// operation. It will double-check that nothing *else* is also blocking the same channel from + /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly. + fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option) { let mut errors = Vec::new(); loop { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { let mut peer_state_lck = peer_state_mtx.lock().unwrap(); let peer_state = &mut *peer_state_lck; - if self.pending_events.lock().unwrap().iter() - .any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate { - channel_funding_outpoint, counterparty_node_id - })) - { - // Check that, while holding the peer lock, we don't have another event - // blocking any monitor updates for this channel. If we do, let those - // events be the ones that ultimately release the monitor update(s). - log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending", + + if let Some(blocker) = completed_blocker.take() { + // Only do this on the first iteration of the loop. + if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates + .get_mut(&channel_funding_outpoint.to_channel_id()) + { + blockers.retain(|iter| iter != &blocker); + } + } + + if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates, + channel_funding_outpoint, counterparty_node_id) { + // Check that, while holding the peer lock, we don't have anything else + // blocking monitor updates for this channel. If we do, release the monitor + // update(s) when those blockers complete. + log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first", log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); break; } + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) { debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint); if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { @@ -6098,7 +6201,7 @@ where EventCompletionAction::ReleaseRAAChannelMonitorUpdate { channel_funding_outpoint, counterparty_node_id } => { - self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint); + self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None); } } } @@ -6774,6 +6877,7 @@ where latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: true, })); }, @@ -7970,6 +8074,7 @@ where latest_features: Readable::read(reader)?, pending_msg_events: Vec::new(), monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: false, }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); @@ -8051,7 +8156,7 @@ where let mut claimable_htlc_purposes = None; let mut claimable_htlc_onion_fields = None; let mut pending_claiming_payments = Some(HashMap::new()); - let mut monitor_update_blocked_actions_per_peer = Some(Vec::new()); + let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), @@ -8376,7 +8481,21 @@ where } for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() { - if let Some(peer_state) = per_peer_state.get_mut(&node_id) { + if let Some(peer_state) = per_peer_state.get(&node_id) { + for (_, actions) in monitor_update_blocked_actions.iter() { + for action in actions.iter() { + if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + downstream_counterparty_and_funding_outpoint: + Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), .. + } = action { + if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) { + blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates + .entry(blocked_channel_outpoint.to_channel_id()) + .or_insert_with(Vec::new).push(blocking_action.clone()); + } + } + } + } peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions; } else { log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id); @@ -8691,7 +8810,7 @@ mod tests { }; let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &random_seed_bytes + None, nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -8725,7 +8844,7 @@ mod tests { let payment_preimage = PaymentPreimage([42; 32]); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &random_seed_bytes + None, nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -8788,7 +8907,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &random_seed_bytes + nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]); @@ -8832,7 +8951,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &random_seed_bytes + nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]); @@ -9372,7 +9491,7 @@ mod tests { } } -#[cfg(all(any(test, feature = "_test_utils"), feature = "_bench_unstable"))] +#[cfg(ldk_bench)] pub mod bench { use crate::chain::Listen; use crate::chain::chainmonitor::{ChainMonitor, Persist}; @@ -9392,7 +9511,7 @@ pub mod bench { use crate::sync::{Arc, Mutex}; - use test::Bencher; + use criterion::Criterion; type Manager<'a, P> = ChannelManager< &'a ChainMonitor Option<&test_utils::TestChainMonitor> { None } } - #[cfg(test)] - #[bench] - fn bench_sends(bench: &mut Bencher) { - bench_two_sends(bench, test_utils::TestPersister::new(), test_utils::TestPersister::new()); + pub fn bench_sends(bench: &mut Criterion) { + bench_two_sends(bench, "bench_sends", test_utils::TestPersister::new(), test_utils::TestPersister::new()); } - pub fn bench_two_sends>(bench: &mut Bencher, persister_a: P, persister_b: P) { + pub fn bench_two_sends>(bench: &mut Criterion, bench_name: &str, persister_a: P, persister_b: P) { // Do a simple benchmark of sending a payment back and forth between two nodes. // Note that this is unrealistic as each payment send will require at least two fsync // calls per node. @@ -9489,10 +9606,7 @@ pub mod bench { assert_eq!(&tx_broadcaster.txn_broadcasted.lock().unwrap()[..], &[tx.clone()]); - let block = Block { - header: BlockHeader { version: 0x20000000, prev_blockhash: BestBlock::from_network(network).block_hash(), merkle_root: TxMerkleNode::all_zeros(), time: 42, bits: 42, nonce: 42 }, - txdata: vec![tx], - }; + let block = create_dummy_block(BestBlock::from_network(network).block_hash(), 42, vec![tx]); Listen::block_connected(&node_a, &block, 1); Listen::block_connected(&node_b, &block, 1); @@ -9575,9 +9689,9 @@ pub mod bench { } } - bench.iter(|| { + bench.bench_function(bench_name, |b| b.iter(|| { send_payment!(node_a, node_b); send_payment!(node_b, node_a); - }); + })); } }