use crate::ln::features::InvoiceFeatures;
use crate::routing::gossip::NetworkGraph;
use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router};
-use crate::routing::scoring::ProbabilisticScorer;
+use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
use crate::ln::msgs;
use crate::ln::onion_utils;
use crate::ln::onion_utils::HTLCFailReason;
/// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
/// event can be generated.
PaymentClaimed { payment_hash: PaymentHash },
- /// Indicates an [`events::Event`] should be surfaced to the user.
- EmitEvent { event: events::Event },
+ /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
+ /// operation of another channel.
+ ///
+ /// This is usually generated when we've forwarded an HTLC and want to block the outbound edge
+ /// from completing a monitor update which removes the payment preimage until the inbound edge
+ /// completes a monitor update containing the payment preimage. In that case, after the inbound
+ /// edge completes, we will surface an [`Event::PaymentForwarded`] as well as unblock the
+ /// outbound edge.
+ EmitEventAndFreeOtherChannel {
+ event: events::Event,
+ downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
+ },
}
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
(0, PaymentClaimed) => { (0, payment_hash, required) },
- (2, EmitEvent) => { (0, event, upgradable_required) },
+ (2, EmitEventAndFreeOtherChannel) => {
+ (0, event, upgradable_required),
+ // LDK prior to 0.0.116 did not have this field as the monitor update application order was
+ // required by clients. If we downgrade to something prior to 0.0.116 this may result in
+ // monitor updates which aren't properly blocked or resumed, however that's fine - we don't
+ // support async monitor updates even in LDK 0.0.116 and once we do we'll require no
+ // downgrades to prior versions.
+ (1, downstream_counterparty_and_funding_outpoint, option),
+ },
);
#[derive(Clone, Debug, PartialEq, Eq)]
};
);
+#[derive(Clone, PartialEq, Eq, Debug)]
+/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
+/// the blocked action here. See enum variants for more info.
+pub(crate) enum RAAMonitorUpdateBlockingAction {
+ /// A forwarded payment was claimed. We block the downstream channel completing its monitor
+ /// update which removes the HTLC preimage until the upstream channel has gotten the preimage
+ /// durably to disk.
+ ForwardedPaymentInboundClaim {
+ /// The upstream channel ID (i.e. the inbound edge).
+ channel_id: [u8; 32],
+ /// The HTLC ID on the inbound edge.
+ htlc_id: u64,
+ },
+}
+
+impl RAAMonitorUpdateBlockingAction {
+ #[allow(unused)]
+ fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
+ Self::ForwardedPaymentInboundClaim {
+ channel_id: prev_hop.outpoint.to_channel_id(),
+ htlc_id: prev_hop.htlc_id,
+ }
+ }
+}
+
+impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
+ (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
+;);
+
+
/// State we hold per-peer.
pub(super) struct PeerState<Signer: ChannelSigner> {
/// `temporary_channel_id` or `channel_id` -> `channel`.
/// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
/// duplicates do not occur, so such channels should fail without a monitor update completing.
monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
+ /// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have
+ /// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update
+ /// will remove a preimage that needs to be durably in an upstream channel first), we put an
+ /// entry here to note that the channel with the key's ID is blocked on a set of actions.
+ actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
/// The peer is currently connected (i.e. we've seen a
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
/// [`ChannelMessageHandler::peer_disconnected`].
Arc<DefaultRouter<
Arc<NetworkGraph<Arc<L>>>,
Arc<L>,
- Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<L>>>, Arc<L>>>>
+ Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<L>>>, Arc<L>>>>,
+ ProbabilisticScoringFeeParameters,
+ ProbabilisticScorer<Arc<NetworkGraph<Arc<L>>>, Arc<L>>,
>>,
Arc<L>
>;
/// of [`KeysManager`] and [`DefaultRouter`].
///
/// This is not exported to bindings users as Arcs don't make sense in bindings
-pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>>, &'g L>;
+pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>;
macro_rules! define_test_pub_trait { ($vis: vis) => {
/// A trivial trait which describes any [`ChannelManager`] used in testing.
let onion_keys = onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv)
.map_err(|_| APIError::InvalidRoute{err: "Pubkey along hop was maliciously selected".to_owned()})?;
let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(path, total_value, recipient_onion, cur_height, keysend_preimage)?;
- if onion_utils::route_size_insane(&onion_payloads) {
- return Err(APIError::InvalidRoute{err: "Route size too large considering onion data".to_owned()});
- }
- let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
+
+ let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash)
+ .map_err(|_| APIError::InvalidRoute { err: "Route size too large considering onion data".to_owned()})?;
let err: Result<(), _> = loop {
let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.hops.first().unwrap().short_channel_id) {
chan.maybe_expire_prev_config();
+ if chan.should_disconnect_peer_awaiting_response() {
+ log_debug!(self.logger, "Disconnecting peer {} due to not making any progress on channel {}",
+ counterparty_node_id, log_bytes!(*chan_id));
+ pending_msg_events.push(MessageSendEvent::HandleError {
+ node_id: counterparty_node_id,
+ action: msgs::ErrorAction::DisconnectPeerWithWarning {
+ msg: msgs::WarningMessage {
+ channel_id: *chan_id,
+ data: "Disconnecting due to timeout awaiting response".to_owned(),
+ },
+ },
+ });
+ }
+
true
});
if peer_state.ok_to_remove(true) {
Some(claimed_htlc_value - forwarded_htlc_value)
} else { None };
- let prev_channel_id = Some(prev_outpoint.to_channel_id());
- let next_channel_id = Some(next_channel_id);
-
- Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded {
- fee_earned_msat,
- claim_from_onchain_tx: from_onchain,
- prev_channel_id,
- next_channel_id,
- outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
- }})
+ Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+ event: events::Event::PaymentForwarded {
+ fee_earned_msat,
+ claim_from_onchain_tx: from_onchain,
+ prev_channel_id: Some(prev_outpoint.to_channel_id()),
+ next_channel_id: Some(next_channel_id),
+ outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
+ },
+ downstream_counterparty_and_funding_outpoint: None,
+ })
} else { None }
});
if let Err((pk, err)) = res {
}, None));
}
},
- MonitorUpdateCompletionAction::EmitEvent { event } => {
+ MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+ event, downstream_counterparty_and_funding_outpoint
+ } => {
self.pending_events.lock().unwrap().push_back((event, None));
+ if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint {
+ self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
+ }
},
}
}
if let Some(tx) = funding_broadcastable {
log_info!(self.logger, "Broadcasting funding transaction with txid {}", tx.txid());
- self.tx_broadcaster.broadcast_transaction(&tx);
+ self.tx_broadcaster.broadcast_transactions(&[&tx]);
}
{
};
if let Some(broadcast_tx) = tx {
log_info!(self.logger, "Broadcasting {}", log_tx!(broadcast_tx));
- self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
+ self.tx_broadcaster.broadcast_transactions(&[&broadcast_tx]);
}
if let Some(chan) = chan_option {
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
}
}
+ /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
+ /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
+ /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
+ /// the [`ChannelMonitorUpdate`] in question.
+ fn raa_monitor_updates_held(&self,
+ actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
+ channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey
+ ) -> bool {
+ actions_blocking_raa_monitor_updates
+ .get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false)
+ || self.pending_events.lock().unwrap().iter().any(|(_, action)| {
+ action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
+ channel_funding_outpoint,
+ counterparty_node_id,
+ })
+ })
+ }
+
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
let (htlcs_to_fail, res) = {
let per_peer_state = self.per_peer_state.read().unwrap();
self.issue_channel_close_events(chan, ClosureReason::CooperativeClosure);
log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
- self.tx_broadcaster.broadcast_transaction(&tx);
+ self.tx_broadcaster.broadcast_transactions(&[&tx]);
update_maps_on_chan_removal!(self, chan);
false
} else { true }
self.pending_outbound_payments.clear_pending_payments()
}
- fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
+ /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
+ /// [`Event`] being handled) completes, this should be called to restore the channel to normal
+ /// operation. It will double-check that nothing *else* is also blocking the same channel from
+ /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
+ fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
let mut errors = Vec::new();
loop {
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
let peer_state = &mut *peer_state_lck;
- if self.pending_events.lock().unwrap().iter()
- .any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
- channel_funding_outpoint, counterparty_node_id
- }))
- {
- // Check that, while holding the peer lock, we don't have another event
- // blocking any monitor updates for this channel. If we do, let those
- // events be the ones that ultimately release the monitor update(s).
- log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
+
+ if let Some(blocker) = completed_blocker.take() {
+ // Only do this on the first iteration of the loop.
+ if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
+ .get_mut(&channel_funding_outpoint.to_channel_id())
+ {
+ blockers.retain(|iter| iter != &blocker);
+ }
+ }
+
+ if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
+ channel_funding_outpoint, counterparty_node_id) {
+ // Check that, while holding the peer lock, we don't have anything else
+ // blocking monitor updates for this channel. If we do, release the monitor
+ // update(s) when those blockers complete.
+ log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first",
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
break;
}
+
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
channel_funding_outpoint, counterparty_node_id
} => {
- self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
+ self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None);
}
}
}
latest_features: init_msg.features.clone(),
pending_msg_events: Vec::new(),
monitor_update_blocked_actions: BTreeMap::new(),
+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
is_connected: true,
}));
},
latest_features: Readable::read(reader)?,
pending_msg_events: Vec::new(),
monitor_update_blocked_actions: BTreeMap::new(),
+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
is_connected: false,
};
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
let mut claimable_htlc_purposes = None;
let mut claimable_htlc_onion_fields = None;
let mut pending_claiming_payments = Some(HashMap::new());
- let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
+ let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
let mut events_override = None;
read_tlv_fields!(reader, {
(1, pending_outbound_payments_no_retry, option),
}
for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
- if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
+ if let Some(peer_state) = per_peer_state.get(&node_id) {
+ for (_, actions) in monitor_update_blocked_actions.iter() {
+ for action in actions.iter() {
+ if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+ downstream_counterparty_and_funding_outpoint:
+ Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
+ } = action {
+ if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
+ blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
+ .entry(blocked_channel_outpoint.to_channel_id())
+ .or_insert_with(Vec::new).push(blocking_action.clone());
+ }
+ }
+ }
+ }
peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
} else {
log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);
};
let route = find_route(
&nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
- None, nodes[0].logger, &scorer, &random_seed_bytes
+ None, nodes[0].logger, &scorer, &(), &random_seed_bytes
).unwrap();
nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap();
let payment_preimage = PaymentPreimage([42; 32]);
let route = find_route(
&nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
- None, nodes[0].logger, &scorer, &random_seed_bytes
+ None, nodes[0].logger, &scorer, &(), &random_seed_bytes
).unwrap();
let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap();
let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
let route = find_route(
&payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
- nodes[0].logger, &scorer, &random_seed_bytes
+ nodes[0].logger, &scorer, &(), &random_seed_bytes
).unwrap();
let test_preimage = PaymentPreimage([42; 32]);
let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
let route = find_route(
&payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
- nodes[0].logger, &scorer, &random_seed_bytes
+ nodes[0].logger, &scorer, &(), &random_seed_bytes
).unwrap();
let test_preimage = PaymentPreimage([42; 32]);
}
}
-#[cfg(all(any(test, feature = "_test_utils"), feature = "_bench_unstable"))]
+#[cfg(ldk_bench)]
pub mod bench {
use crate::chain::Listen;
use crate::chain::chainmonitor::{ChainMonitor, Persist};
use crate::sync::{Arc, Mutex};
- use test::Bencher;
+ use criterion::Criterion;
type Manager<'a, P> = ChannelManager<
&'a ChainMonitor<InMemorySigner, &'a test_utils::TestChainSource,
fn chain_monitor(&self) -> Option<&test_utils::TestChainMonitor> { None }
}
- #[cfg(test)]
- #[bench]
- fn bench_sends(bench: &mut Bencher) {
- bench_two_sends(bench, test_utils::TestPersister::new(), test_utils::TestPersister::new());
+ pub fn bench_sends(bench: &mut Criterion) {
+ bench_two_sends(bench, "bench_sends", test_utils::TestPersister::new(), test_utils::TestPersister::new());
}
- pub fn bench_two_sends<P: Persist<InMemorySigner>>(bench: &mut Bencher, persister_a: P, persister_b: P) {
+ pub fn bench_two_sends<P: Persist<InMemorySigner>>(bench: &mut Criterion, bench_name: &str, persister_a: P, persister_b: P) {
// Do a simple benchmark of sending a payment back and forth between two nodes.
// Note that this is unrealistic as each payment send will require at least two fsync
// calls per node.
assert_eq!(&tx_broadcaster.txn_broadcasted.lock().unwrap()[..], &[tx.clone()]);
- let block = Block {
- header: BlockHeader { version: 0x20000000, prev_blockhash: BestBlock::from_network(network).block_hash(), merkle_root: TxMerkleNode::all_zeros(), time: 42, bits: 42, nonce: 42 },
- txdata: vec![tx],
- };
+ let block = create_dummy_block(BestBlock::from_network(network).block_hash(), 42, vec![tx]);
Listen::block_connected(&node_a, &block, 1);
Listen::block_connected(&node_b, &block, 1);
}
}
- bench.iter(|| {
+ bench.bench_function(bench_name, |b| b.iter(|| {
send_payment!(node_a, node_b);
send_payment!(node_b, node_a);
- });
+ }));
}
}