X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=50e65db3f67dabc6227491b634e93cb404c51787;hb=99985c620962fd35163ee8b585c3a7edb6491db7;hp=5f77c43cc375d7ccc91fcbe787268462edd8483d;hpb=d5b05e54c32cd53e0534849b57242c65747738b1;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 5f77c43c..50e65db3 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -19,7 +19,7 @@ use bitcoin::blockdata::block::BlockHeader; use bitcoin::blockdata::transaction::Transaction; -use bitcoin::blockdata::constants::genesis_block; +use bitcoin::blockdata::constants::{genesis_block, ChainHash}; use bitcoin::network::constants::Network; use bitcoin::hashes::Hash; @@ -40,13 +40,13 @@ use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, Messa // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret}; -use crate::ln::channel::{Channel, ChannelError, ChannelUpdateStatus, UpdateFulfillCommitFetch}; +use crate::ln::channel::{Channel, ChannelError, ChannelUpdateStatus, ShutdownResult, UpdateFulfillCommitFetch}; use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures}; #[cfg(any(feature = "_test_utils", test))] use crate::ln::features::InvoiceFeatures; use crate::routing::gossip::NetworkGraph; -use crate::routing::router::{DefaultRouter, InFlightHtlcs, Path, PaymentParameters, Route, RouteHop, RouteParameters, Router}; -use crate::routing::scoring::ProbabilisticScorer; +use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router}; +use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use crate::ln::msgs; use crate::ln::onion_utils; use crate::ln::onion_utils::HTLCFailReason; @@ -55,8 +55,8 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA use crate::ln::outbound_payment; use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment}; use crate::ln::wire::Encode; -use crate::chain::keysinterface::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner, WriteableEcdsaChannelSigner}; -use crate::util::config::{UserConfig, ChannelConfig}; +use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner, WriteableEcdsaChannelSigner}; +use crate::util::config::{UserConfig, ChannelConfig, ChannelConfigUpdate}; use crate::util::wakers::{Future, Notifier}; use crate::util::scid_utils::fake_scid; use crate::util::string::UntrustedString; @@ -72,12 +72,13 @@ use core::{cmp, mem}; use core::cell::RefCell; use crate::io::Read; use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState}; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; use core::time::Duration; use core::ops::Deref; // Re-export this for use in the public API. pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure, RecipientOnionFields}; +use crate::ln::script::ShutdownScript; // We hold various information about HTLC relay in the HTLC objects in Channel itself: // @@ -358,8 +359,6 @@ pub enum FailureCode { IncorrectOrUnknownPaymentDetails = 0x4000 | 15, } -type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash, PublicKey, [u8; 32])>); - /// Error type returned across the peer_state mutex boundary. When an Err is generated for a /// Channel, we generally end up with a ChannelError::Close for which we have to close the channel /// immediately (ie with no further calls on it made). Thus, this step happens inside a @@ -496,13 +495,34 @@ struct ClaimablePayments { pending_claiming_payments: HashMap, } -/// Events which we process internally but cannot be procsesed immediately at the generation site -/// for some reason. They are handled in timer_tick_occurred, so may be processed with -/// quite some time lag. +/// Events which we process internally but cannot be processed immediately at the generation site +/// usually because we're running pre-full-init. They are handled immediately once we detect we are +/// running normally, and specifically must be processed before any other non-background +/// [`ChannelMonitorUpdate`]s are applied. enum BackgroundEvent { - /// Handle a ChannelMonitorUpdate that closes a channel, broadcasting its current latest holder - /// commitment transaction. - ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)), + /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from + /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public + /// key to handle channel resumption, whereas if the channel has been force-closed we do not + /// need the counterparty node_id. + /// + /// Note that any such events are lost on shutdown, so in general they must be updates which + /// are regenerated on startup. + ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), + /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the + /// channel to continue normal operation. + /// + /// In general this should be used rather than + /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the + /// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`] + /// error the other variant is acceptable. + /// + /// Note that any such events are lost on shutdown, so in general they must be updates which + /// are regenerated on startup. + MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: PublicKey, + funding_txo: OutPoint, + update: ChannelMonitorUpdate + }, } #[derive(Debug)] @@ -512,15 +532,77 @@ pub(crate) enum MonitorUpdateCompletionAction { /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate /// event can be generated. PaymentClaimed { payment_hash: PaymentHash }, - /// Indicates an [`events::Event`] should be surfaced to the user. - EmitEvent { event: events::Event }, + /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the + /// operation of another channel. + /// + /// This is usually generated when we've forwarded an HTLC and want to block the outbound edge + /// from completing a monitor update which removes the payment preimage until the inbound edge + /// completes a monitor update containing the payment preimage. In that case, after the inbound + /// edge completes, we will surface an [`Event::PaymentForwarded`] as well as unblock the + /// outbound edge. + EmitEventAndFreeOtherChannel { + event: events::Event, + downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>, + }, } impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (0, PaymentClaimed) => { (0, payment_hash, required) }, - (2, EmitEvent) => { (0, event, upgradable_required) }, + (2, EmitEventAndFreeOtherChannel) => { + (0, event, upgradable_required), + // LDK prior to 0.0.116 did not have this field as the monitor update application order was + // required by clients. If we downgrade to something prior to 0.0.116 this may result in + // monitor updates which aren't properly blocked or resumed, however that's fine - we don't + // support async monitor updates even in LDK 0.0.116 and once we do we'll require no + // downgrades to prior versions. + (1, downstream_counterparty_and_funding_outpoint, option), + }, +); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum EventCompletionAction { + ReleaseRAAChannelMonitorUpdate { + counterparty_node_id: PublicKey, + channel_funding_outpoint: OutPoint, + }, +} +impl_writeable_tlv_based_enum!(EventCompletionAction, + (0, ReleaseRAAChannelMonitorUpdate) => { + (0, channel_funding_outpoint, required), + (2, counterparty_node_id, required), + }; ); +#[derive(Clone, PartialEq, Eq, Debug)] +/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track +/// the blocked action here. See enum variants for more info. +pub(crate) enum RAAMonitorUpdateBlockingAction { + /// A forwarded payment was claimed. We block the downstream channel completing its monitor + /// update which removes the HTLC preimage until the upstream channel has gotten the preimage + /// durably to disk. + ForwardedPaymentInboundClaim { + /// The upstream channel ID (i.e. the inbound edge). + channel_id: [u8; 32], + /// The HTLC ID on the inbound edge. + htlc_id: u64, + }, +} + +impl RAAMonitorUpdateBlockingAction { + #[allow(unused)] + fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self { + Self::ForwardedPaymentInboundClaim { + channel_id: prev_hop.outpoint.to_channel_id(), + htlc_id: prev_hop.htlc_id, + } + } +} + +impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction, + (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) } +;); + + /// State we hold per-peer. pub(super) struct PeerState { /// `temporary_channel_id` or `channel_id` -> `channel`. @@ -549,6 +631,11 @@ pub(super) struct PeerState { /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure /// duplicates do not occur, so such channels should fail without a monitor update completing. monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec>, + /// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have + /// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update + /// will remove a preimage that needs to be durably in an upstream channel first), we put an + /// entry here to note that the channel with the key's ID is blocked on a set of actions. + actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec>, /// The peer is currently connected (i.e. we've seen a /// [`ChannelMessageHandler::peer_connected`] and no corresponding /// [`ChannelMessageHandler::peer_disconnected`]. @@ -608,7 +695,9 @@ pub type SimpleArcChannelManager = ChannelManager< Arc>>, Arc, - Arc>>, Arc>>> + Arc>>, Arc>>>, + ProbabilisticScoringFeeParameters, + ProbabilisticScorer>>, Arc>, >>, Arc >; @@ -624,42 +713,46 @@ pub type SimpleArcChannelManager = ChannelManager< /// of [`KeysManager`] and [`DefaultRouter`]. /// /// This is not exported to bindings users as Arcs don't make sense in bindings -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>>, &'g L>; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>; +macro_rules! define_test_pub_trait { ($vis: vis) => { /// A trivial trait which describes any [`ChannelManager`] used in testing. -#[cfg(any(test, feature = "_test_utils"))] -pub trait AChannelManager { - type Watch: chain::Watch; +$vis trait AChannelManager { + type Watch: chain::Watch + ?Sized; type M: Deref; - type Broadcaster: BroadcasterInterface; + type Broadcaster: BroadcasterInterface + ?Sized; type T: Deref; - type EntropySource: EntropySource; + type EntropySource: EntropySource + ?Sized; type ES: Deref; - type NodeSigner: NodeSigner; + type NodeSigner: NodeSigner + ?Sized; type NS: Deref; - type Signer: WriteableEcdsaChannelSigner; - type SignerProvider: SignerProvider; + type Signer: WriteableEcdsaChannelSigner + Sized; + type SignerProvider: SignerProvider + ?Sized; type SP: Deref; - type FeeEstimator: FeeEstimator; + type FeeEstimator: FeeEstimator + ?Sized; type F: Deref; - type Router: Router; + type Router: Router + ?Sized; type R: Deref; - type Logger: Logger; + type Logger: Logger + ?Sized; type L: Deref; fn get_cm(&self) -> &ChannelManager; } +} } #[cfg(any(test, feature = "_test_utils"))] +define_test_pub_trait!(pub); +#[cfg(not(any(test, feature = "_test_utils")))] +define_test_pub_trait!(pub(crate)); impl AChannelManager for ChannelManager where - M::Target: chain::Watch<::Signer> + Sized, - T::Target: BroadcasterInterface + Sized, - ES::Target: EntropySource + Sized, - NS::Target: NodeSigner + Sized, - SP::Target: SignerProvider + Sized, - F::Target: FeeEstimator + Sized, - R::Target: Router + Sized, - L::Target: Logger + Sized, + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + ES::Target: EntropySource, + NS::Target: NodeSigner, + SP::Target: SignerProvider, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { type Watch = M::Target; type M = M; @@ -932,9 +1025,31 @@ where #[cfg(any(test, feature = "_test_utils"))] pub(super) per_peer_state: FairRwLock::Signer>>>>, + /// The set of events which we need to give to the user to handle. In some cases an event may + /// require some further action after the user handles it (currently only blocking a monitor + /// update from being handed to the user to ensure the included changes to the channel state + /// are handled by the user before they're persisted durably to disk). In that case, the second + /// element in the tuple is set to `Some` with further details of the action. + /// + /// Note that events MUST NOT be removed from pending_events after deserialization, as they + /// could be in the middle of being processed without the direct mutex held. + /// /// See `ChannelManager` struct-level documentation for lock order requirements. - pending_events: Mutex>, + pending_events: Mutex)>>, + /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously. + pending_events_processor: AtomicBool, + + /// If we are running during init (either directly during the deserialization method or in + /// block connection methods which run after deserialization but before normal operation) we + /// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow - + /// prior to normal operation the user may not have loaded the [`ChannelMonitor`]s into their + /// [`ChainMonitor`] and thus attempting to update it will fail or panic. + /// + /// Thus, we place them here to be handled as soon as possible once we are running normally. + /// /// See `ChannelManager` struct-level documentation for lock order requirements. + /// + /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor pending_background_events: Mutex>, /// Used when we have to take a BIG lock to make sure everything is self-consistent. /// Essentially just when we're serializing ourselves out. @@ -944,6 +1059,9 @@ where /// Notifier the lock contains sends out a notification when the lock is released. total_consistency_lock: RwLock<()>, + #[cfg(debug_assertions)] + background_events_processed_since_startup: AtomicBool, + persistence_notifier: Notifier, entropy_source: ES, @@ -970,6 +1088,7 @@ pub struct ChainParameters { } #[derive(Copy, Clone, PartialEq)] +#[must_use] enum NotifyOption { DoPersist, SkipPersist, @@ -993,10 +1112,20 @@ struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { } impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused - fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a Notifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { - PersistenceNotifierGuard::optionally_notify(lock, notifier, || -> NotifyOption { NotifyOption::DoPersist }) + fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); + let _ = cm.get_cm().process_background_events(); // We always persist + + PersistenceNotifierGuard { + persistence_notifier: &cm.get_cm().persistence_notifier, + should_persist: || -> NotifyOption { NotifyOption::DoPersist }, + _read_guard: read_guard, + } + } + /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated, + /// [`ChannelManager::process_background_events`] MUST be called first. fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { let read_guard = lock.read().unwrap(); @@ -1244,8 +1373,14 @@ pub struct ChannelDetails { /// the current state and per-HTLC limit(s). This is intended for use when routing, allowing us /// to use a limit as close as possible to the HTLC limit we can currently send. /// - /// See also [`ChannelDetails::balance_msat`] and [`ChannelDetails::outbound_capacity_msat`]. + /// See also [`ChannelDetails::next_outbound_htlc_minimum_msat`], + /// [`ChannelDetails::balance_msat`], and [`ChannelDetails::outbound_capacity_msat`]. pub next_outbound_htlc_limit_msat: u64, + /// The minimum value for sending a single HTLC to the remote peer. This is the equivalent of + /// [`ChannelDetails::next_outbound_htlc_limit_msat`] but represents a lower-bound, rather than + /// an upper-bound. This is intended for use when routing, allowing us to ensure we pick a + /// route which is valid. + pub next_outbound_htlc_minimum_msat: u64, /// The available inbound capacity for the remote peer to send HTLCs to us. This does not /// include any pending HTLCs which are not yet fully resolved (and, thus, whose balance is not /// available for inclusion in new inbound HTLCs). @@ -1365,6 +1500,7 @@ impl ChannelDetails { inbound_capacity_msat: balance.inbound_capacity_msat, outbound_capacity_msat: balance.outbound_capacity_msat, next_outbound_htlc_limit_msat: balance.next_outbound_htlc_limit_msat, + next_outbound_htlc_minimum_msat: balance.next_outbound_htlc_minimum_msat, user_channel_id: channel.get_user_id(), confirmations_required: channel.minimum_depth(), confirmations: Some(channel.get_funding_tx_confirmations(best_block_height)), @@ -1412,7 +1548,7 @@ pub enum RecentPaymentDetails { /// Route hints used in constructing invoices for [phantom node payents]. /// -/// [phantom node payments]: crate::chain::keysinterface::PhantomKeysManager +/// [phantom node payments]: crate::sign::PhantomKeysManager #[derive(Clone)] pub struct PhantomRouteHints { /// The list of channels to be included in the invoice route hints. @@ -1444,10 +1580,10 @@ macro_rules! handle_error { }); } if let Some((channel_id, user_channel_id)) = chan_id { - $self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { + $self.pending_events.lock().unwrap().push_back((events::Event::ChannelClosed { channel_id, user_channel_id, reason: ClosureReason::ProcessingError { err: err.err.clone() } - }); + }, None)); } } @@ -1579,13 +1715,13 @@ macro_rules! send_channel_ready { macro_rules! emit_channel_pending_event { ($locked_events: expr, $channel: expr) => { if $channel.should_emit_channel_pending_event() { - $locked_events.push(events::Event::ChannelPending { + $locked_events.push_back((events::Event::ChannelPending { channel_id: $channel.channel_id(), former_temporary_channel_id: $channel.temporary_channel_id(), counterparty_node_id: $channel.get_counterparty_node_id(), user_channel_id: $channel.get_user_id(), funding_txo: $channel.get_funding_txo().unwrap().into_bitcoin_outpoint(), - }); + }, None)); $channel.set_channel_pending_event_emitted(); } } @@ -1595,12 +1731,12 @@ macro_rules! emit_channel_ready_event { ($locked_events: expr, $channel: expr) => { if $channel.should_emit_channel_ready_event() { debug_assert!($channel.channel_pending_event_emitted()); - $locked_events.push(events::Event::ChannelReady { + $locked_events.push_back((events::Event::ChannelReady { channel_id: $channel.channel_id(), user_channel_id: $channel.get_user_id(), counterparty_node_id: $channel.get_counterparty_node_id(), channel_type: $channel.get_channel_type().clone(), - }); + }, None)); $channel.set_channel_ready_event_emitted(); } } @@ -1660,6 +1796,9 @@ macro_rules! handle_new_monitor_update { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); + #[cfg(debug_assertions)] { + debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); + } match $update_res { ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", @@ -1678,11 +1817,8 @@ macro_rules! handle_new_monitor_update { res }, ChannelMonitorUpdateStatus::Completed => { - if ($update_id == 0 || $chan.get_next_monitor_update() - .expect("We can't be processing a monitor update if it isn't queued") - .update_id == $update_id) && - $chan.get_latest_monitor_update_id() == $update_id - { + $chan.complete_one_mon_update($update_id); + if $chan.no_monitor_updates_pending() { handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); } Ok(()) @@ -1696,30 +1832,62 @@ macro_rules! handle_new_monitor_update { macro_rules! process_events_body { ($self: expr, $event_to_handle: expr, $handle_event: expr) => { - // We'll acquire our total consistency lock until the returned future completes so that - // we can be sure no other persists happen while processing events. - let _read_guard = $self.total_consistency_lock.read().unwrap(); + let mut processed_all_events = false; + while !processed_all_events { + if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; + } - let mut result = NotifyOption::SkipPersist; + let mut result = NotifyOption::SkipPersist; - // TODO: This behavior should be documented. It's unintuitive that we query - // ChannelMonitors when clearing other events. - if $self.process_pending_monitor_events() { - result = NotifyOption::DoPersist; - } + { + // We'll acquire our total consistency lock so that we can be sure no other + // persists happen while processing monitor events. + let _read_guard = $self.total_consistency_lock.read().unwrap(); + + // Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must + // ensure any startup-generated background events are handled first. + if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; } + + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if $self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + } - let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]); - if !pending_events.is_empty() { - result = NotifyOption::DoPersist; - } + let pending_events = $self.pending_events.lock().unwrap().clone(); + let num_events = pending_events.len(); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } - for event in pending_events { - $event_to_handle = event; - $handle_event; - } + let mut post_event_actions = Vec::new(); + + for (event, action_opt) in pending_events { + $event_to_handle = event; + $handle_event; + if let Some(action) = action_opt { + post_event_actions.push(action); + } + } - if result == NotifyOption::DoPersist { - $self.persistence_notifier.notify(); + { + let mut pending_events = $self.pending_events.lock().unwrap(); + pending_events.drain(..num_events); + processed_all_events = pending_events.is_empty(); + $self.pending_events_processor.store(false, Ordering::Release); + } + + if !post_event_actions.is_empty() { + $self.handle_post_event_actions(post_event_actions); + // If we had some actions, go around again as we may have more events now + processed_all_events = false; + } + + if result == NotifyOption::DoPersist { + $self.persistence_notifier.notify(); + } } } } @@ -1786,9 +1954,12 @@ where per_peer_state: FairRwLock::new(HashMap::new()), - pending_events: Mutex::new(Vec::new()), + pending_events: Mutex::new(VecDeque::new()), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), + #[cfg(debug_assertions)] + background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), entropy_source, @@ -1834,6 +2005,10 @@ where /// Raises [`APIError::APIMisuseError`] when `channel_value_satoshis` > 2**24 or `push_msat` is /// greater than `channel_value_satoshis * 1k` or `channel_value_satoshis < 1000`. /// + /// Raises [`APIError::ChannelUnavailable`] if the channel cannot be opened due to failing to + /// generate a shutdown scriptpubkey or destination script set by + /// [`SignerProvider::get_shutdown_scriptpubkey`] or [`SignerProvider::get_destination_script`]. + /// /// Note that we do not check if you are currently connected to the given peer. If no /// connection is available, the outbound `open_channel` message may fail to send, resulting in /// the channel eventually being silently forgotten (dropped on reload). @@ -1853,7 +2028,7 @@ where return Err(APIError::APIMisuseError { err: format!("Channel value must be at least 1000 satoshis. It was {}", channel_value_satoshis) }); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); // We want to make sure the lock is actually acquired by PersistenceNotifierGuard. debug_assert!(&self.total_consistency_lock.try_write().is_err()); @@ -1993,19 +2168,21 @@ where let mut pending_events_lock = self.pending_events.lock().unwrap(); match channel.unbroadcasted_funding() { Some(transaction) => { - pending_events_lock.push(events::Event::DiscardFunding { channel_id: channel.channel_id(), transaction }) + pending_events_lock.push_back((events::Event::DiscardFunding { + channel_id: channel.channel_id(), transaction + }, None)); }, None => {}, } - pending_events_lock.push(events::Event::ChannelClosed { + pending_events_lock.push_back((events::Event::ChannelClosed { channel_id: channel.channel_id(), user_channel_id: channel.get_user_id(), reason: closure_reason - }); + }, None)); } - fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option, override_shutdown_script: Option) -> Result<(), APIError> { + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>; let result: Result<(), _> = loop { @@ -2021,7 +2198,7 @@ where let funding_txo_opt = chan_entry.get().get_funding_txo(); let their_features = &peer_state.latest_features; let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut() - .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?; + .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; failed_htlcs = htlcs; // We can send the `shutdown` message before updating the `ChannelMonitor` @@ -2078,12 +2255,17 @@ where /// /// May generate a [`SendShutdown`] message event on success, which should be relayed. /// + /// Raises [`APIError::ChannelUnavailable`] if the channel cannot be closed due to failing to + /// generate a shutdown scriptpubkey or destination script set by + /// [`SignerProvider::get_shutdown_scriptpubkey`]. A force-closure may be needed to close the + /// channel. + /// /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal /// [`SendShutdown`]: crate::events::MessageSendEvent::SendShutdown pub fn close_channel(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey) -> Result<(), APIError> { - self.close_channel_internal(channel_id, counterparty_node_id, None) + self.close_channel_internal(channel_id, counterparty_node_id, None, None) } /// Begins the process of closing a channel. After this call (plus some timeout), no new HTLCs @@ -2100,14 +2282,24 @@ where /// transaction feerate below `target_feerate_sat_per_1000_weight` (or the feerate which /// will appear on a force-closure transaction, whichever is lower). /// + /// The `shutdown_script` provided will be used as the `scriptPubKey` for the closing transaction. + /// Will fail if a shutdown script has already been set for this channel by + /// ['ChannelHandshakeConfig::commit_upfront_shutdown_pubkey`]. The given shutdown script must + /// also be compatible with our and the counterparty's features. + /// /// May generate a [`SendShutdown`] message event on success, which should be relayed. /// + /// Raises [`APIError::ChannelUnavailable`] if the channel cannot be closed due to failing to + /// generate a shutdown scriptpubkey or destination script set by + /// [`SignerProvider::get_shutdown_scriptpubkey`]. A force-closure may be needed to close the + /// channel. + /// /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal /// [`SendShutdown`]: crate::events::MessageSendEvent::SendShutdown - pub fn close_channel_with_target_feerate(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: u32) -> Result<(), APIError> { - self.close_channel_internal(channel_id, counterparty_node_id, Some(target_feerate_sats_per_1000_weight)) + pub fn close_channel_with_feerate_and_script(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option, shutdown_script: Option) -> Result<(), APIError> { + self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script) } #[inline] @@ -2120,7 +2312,7 @@ where let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } - if let Some((funding_txo, monitor_update)) = monitor_update_option { + if let Some((_, funding_txo, monitor_update)) = monitor_update_option { // There isn't anything we can do if we get an update failure - we're already // force-closing. The monitor update on the required in-memory copy should broadcast // the latest local state, which is the best we can do anyway. Thus, it is safe to @@ -2163,7 +2355,7 @@ where } fn force_close_sending_error(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, broadcast: bool) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); match self.force_close_channel_with_peer(channel_id, counterparty_node_id, None, broadcast) { Ok(counterparty_node_id) => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -2655,10 +2847,9 @@ where let onion_keys = onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv) .map_err(|_| APIError::InvalidRoute{err: "Pubkey along hop was maliciously selected".to_owned()})?; let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(path, total_value, recipient_onion, cur_height, keysend_preimage)?; - if onion_utils::route_size_insane(&onion_payloads) { - return Err(APIError::InvalidRoute{err: "Route size too large considering onion data".to_owned()}); - } - let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash); + + let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash) + .map_err(|_| APIError::InvalidRoute { err: "Route size too large considering onion data".to_owned()})?; let err: Result<(), _> = loop { let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.hops.first().unwrap().short_channel_id) { @@ -2773,18 +2964,18 @@ where /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress pub fn send_payment_with_route(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId) -> Result<(), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments .send_payment_with_route(route, payment_hash, recipient_onion, payment_id, &self.entropy_source, &self.node_signer, best_block_height, |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) } - /// Similar to [`ChannelManager::send_payment`], but will automatically find a route based on + /// Similar to [`ChannelManager::send_payment_with_route`], but will automatically find a route based on /// `route_params` and retry failed payment paths based on `retry_strategy`. pub fn send_payment(&self, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), RetryableSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments .send_payment(payment_hash, recipient_onion, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), @@ -2797,7 +2988,7 @@ where #[cfg(test)] pub(super) fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, keysend_preimage: Option, payment_id: PaymentId, recv_value_msat: Option, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, best_block_height, |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) @@ -2832,7 +3023,7 @@ where /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`Event::PaymentSent`]: events::Event::PaymentSent pub fn abandon_payment(&self, payment_id: PaymentId) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.abandon_payment(payment_id, PaymentFailureReason::UserAbandoned, &self.pending_events); } @@ -2853,7 +3044,7 @@ where /// [`send_payment`]: Self::send_payment pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option, recipient_onion: RecipientOnionFields, payment_id: PaymentId) -> Result { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_spontaneous_payment_with_route( route, payment_preimage, recipient_onion, payment_id, &self.entropy_source, &self.node_signer, best_block_height, @@ -2870,7 +3061,7 @@ where /// [`PaymentParameters::for_keysend`]: crate::routing::router::PaymentParameters::for_keysend pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, recipient_onion, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, @@ -2884,7 +3075,7 @@ where /// us to easily discern them from real payments. pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height, |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) @@ -2995,7 +3186,7 @@ where /// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady /// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); for inp in funding_transaction.input.iter() { if inp.witness.is_empty() { @@ -3006,16 +3197,23 @@ where } { let height = self.best_block.read().unwrap().height(); - // Transactions are evaluated as final by network mempools at the next block. However, the modules - // constituting our Lightning node might not have perfect sync about their blockchain views. Thus, if - // the wallet module is in advance on the LDK view, allow one more block of headroom. - if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 2 { + // Transactions are evaluated as final by network mempools if their locktime is strictly + // lower than the next block height. However, the modules constituting our Lightning + // node might not have perfect sync about their blockchain views. Thus, if the wallet + // module is ahead of LDK, only allow one more block of headroom. + if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 { return Err(APIError::APIMisuseError { err: "Funding transaction absolute timelock is non-final".to_owned() }); } } self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| { + if tx.output.len() > u16::max_value() as usize { + return Err(APIError::APIMisuseError { + err: "Transaction had more than 2^16 outputs, which is not supported".to_owned() + }); + } + let mut output_index = None; let expected_spk = chan.get_funding_redeemscript().to_v0_p2wsh(); for (idx, outp) in tx.output.iter().enumerate() { @@ -3025,11 +3223,6 @@ where err: "Multiple outputs matched the expected script and value".to_owned() }); } - if idx > u16::max_value() as usize { - return Err(APIError::APIMisuseError { - err: "Transaction had more than 2^16 outputs, which is not supported".to_owned() - }); - } output_index = Some(idx as u16); } } @@ -3042,7 +3235,7 @@ where }) } - /// Atomically updates the [`ChannelConfig`] for the given channels. + /// Atomically applies partial updates to the [`ChannelConfig`] of the given channels. /// /// Once the updates are applied, each eligible channel (advertised with a known short channel /// ID and a change in [`forwarding_fee_proportional_millionths`], [`forwarding_fee_base_msat`], @@ -3064,18 +3257,16 @@ where /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`ChannelUnavailable`]: APIError::ChannelUnavailable /// [`APIMisuseError`]: APIError::APIMisuseError - pub fn update_channel_config( - &self, counterparty_node_id: &PublicKey, channel_ids: &[[u8; 32]], config: &ChannelConfig, + pub fn update_partial_channel_config( + &self, counterparty_node_id: &PublicKey, channel_ids: &[[u8; 32]], config_update: &ChannelConfigUpdate, ) -> Result<(), APIError> { - if config.cltv_expiry_delta < MIN_CLTV_EXPIRY_DELTA { + if config_update.cltv_expiry_delta.map(|delta| delta < MIN_CLTV_EXPIRY_DELTA).unwrap_or(false) { return Err(APIError::APIMisuseError { err: format!("The chosen CLTV expiry delta is below the minimum of {}", MIN_CLTV_EXPIRY_DELTA), }); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop( - &self.total_consistency_lock, &self.persistence_notifier, - ); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; @@ -3090,7 +3281,9 @@ where } for channel_id in channel_ids { let channel = peer_state.channel_by_id.get_mut(channel_id).unwrap(); - if !channel.update_config(config) { + let mut config = channel.config(); + config.apply(config_update); + if !channel.update_config(&config) { continue; } if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { @@ -3105,6 +3298,34 @@ where Ok(()) } + /// Atomically updates the [`ChannelConfig`] for the given channels. + /// + /// Once the updates are applied, each eligible channel (advertised with a known short channel + /// ID and a change in [`forwarding_fee_proportional_millionths`], [`forwarding_fee_base_msat`], + /// or [`cltv_expiry_delta`]) has a [`BroadcastChannelUpdate`] event message generated + /// containing the new [`ChannelUpdate`] message which should be broadcast to the network. + /// + /// Returns [`ChannelUnavailable`] when a channel is not found or an incorrect + /// `counterparty_node_id` is provided. + /// + /// Returns [`APIMisuseError`] when a [`cltv_expiry_delta`] update is to be applied with a value + /// below [`MIN_CLTV_EXPIRY_DELTA`]. + /// + /// If an error is returned, none of the updates should be considered applied. + /// + /// [`forwarding_fee_proportional_millionths`]: ChannelConfig::forwarding_fee_proportional_millionths + /// [`forwarding_fee_base_msat`]: ChannelConfig::forwarding_fee_base_msat + /// [`cltv_expiry_delta`]: ChannelConfig::cltv_expiry_delta + /// [`BroadcastChannelUpdate`]: events::MessageSendEvent::BroadcastChannelUpdate + /// [`ChannelUpdate`]: msgs::ChannelUpdate + /// [`ChannelUnavailable`]: APIError::ChannelUnavailable + /// [`APIMisuseError`]: APIError::APIMisuseError + pub fn update_channel_config( + &self, counterparty_node_id: &PublicKey, channel_ids: &[[u8; 32]], config: &ChannelConfig, + ) -> Result<(), APIError> { + return self.update_partial_channel_config(counterparty_node_id, channel_ids, &(*config).into()); + } + /// Attempts to forward an intercepted HTLC over the provided channel id and with the provided /// amount to forward. Should only be called in response to an [`HTLCIntercepted`] event. /// @@ -3128,7 +3349,7 @@ where // TODO: when we move to deciding the best outbound channel at forward time, only take // `next_node_id` and not `next_hop_channel_id` pub fn forward_intercepted_htlc(&self, intercept_id: InterceptId, next_hop_channel_id: &[u8; 32], next_node_id: PublicKey, amt_to_forward_msat: u64) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let next_hop_scid = { let peer_state_lock = self.per_peer_state.read().unwrap(); @@ -3184,7 +3405,7 @@ where /// /// [`HTLCIntercepted`]: events::Event::HTLCIntercepted pub fn fail_intercepted_htlc(&self, intercept_id: InterceptId) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let payment = self.pending_intercepted_htlcs.lock().unwrap().remove(&intercept_id) .ok_or_else(|| APIError::APIMisuseError { @@ -3213,9 +3434,9 @@ where /// Should only really ever be called in response to a PendingHTLCsForwardable event. /// Will likely generate further events. pub fn process_pending_htlc_forwards(&self) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let mut new_events = Vec::new(); + let mut new_events = VecDeque::new(); let mut failed_forwards = Vec::new(); let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new(); { @@ -3541,7 +3762,7 @@ where htlcs.push(claimable_htlc); let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum(); htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat)); - new_events.push(events::Event::PaymentClaimable { + new_events.push_back((events::Event::PaymentClaimable { receiver_node_id: Some(receiver_node_id), payment_hash, purpose: purpose(), @@ -3550,7 +3771,7 @@ where via_user_channel_id: Some(prev_user_channel_id), claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER), onion_fields: claimable_payment.onion_fields.clone(), - }); + }, None)); payment_claimable_generated = true; } else { // Nothing to do - we haven't reached the total @@ -3611,7 +3832,7 @@ where htlcs: vec![claimable_htlc], }); let prev_channel_id = prev_funding_outpoint.to_channel_id(); - new_events.push(events::Event::PaymentClaimable { + new_events.push_back((events::Event::PaymentClaimable { receiver_node_id: Some(receiver_node_id), payment_hash, amount_msat, @@ -3620,7 +3841,7 @@ where via_user_channel_id: Some(prev_user_channel_id), claim_deadline, onion_fields: Some(onion_fields), - }); + }, None)); }, hash_map::Entry::Occupied(_) => { log_trace!(self.logger, "Failing new keysend HTLC with payment_hash {} for a duplicative payment hash", log_bytes!(payment_hash.0)); @@ -3684,35 +3905,63 @@ where events.append(&mut new_events); } - /// Free the background events, generally called from timer_tick_occurred. - /// - /// Exposed for testing to allow us to process events quickly without generating accidental - /// BroadcastChannelUpdate events in timer_tick_occurred. + /// Free the background events, generally called from [`PersistenceNotifierGuard`] constructors. /// /// Expects the caller to have a total_consistency_lock read lock. - fn process_background_events(&self) -> bool { + fn process_background_events(&self) -> NotifyOption { + debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread); + + #[cfg(debug_assertions)] + self.background_events_processed_since_startup.store(true, Ordering::Release); + let mut background_events = Vec::new(); mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events); if background_events.is_empty() { - return false; + return NotifyOption::SkipPersist; } for event in background_events.drain(..) { match event { - BackgroundEvent::ClosingMonitorUpdate((funding_txo, update)) => { + BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { // The channel has already been closed, so no use bothering to care about the // monitor updating completing. let _ = self.chain_monitor.update_channel(funding_txo, &update); }, + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => { + let update_res = self.chain_monitor.update_channel(funding_txo, &update); + + let res = { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { + hash_map::Entry::Occupied(mut chan) => { + handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan) + }, + hash_map::Entry::Vacant(_) => Ok(()), + } + } else { Ok(()) } + }; + // TODO: If this channel has since closed, we're likely providing a payment + // preimage update, which we must ensure is durable! We currently don't, + // however, ensure that. + if res.is_err() { + log_error!(self.logger, + "Failed to provide ChannelMonitorUpdate to closed channel! This likely lost us a payment preimage!"); + } + let _ = handle_error!(self, res, counterparty_node_id); + }, } } - true + NotifyOption::DoPersist } #[cfg(any(test, feature = "_test_utils"))] /// Process background events, for functional testing pub fn test_process_background_events(&self) { - self.process_background_events(); + let _lck = self.total_consistency_lock.read().unwrap(); + let _ = self.process_background_events(); } fn update_channel_fee(&self, chan_id: &[u8; 32], chan: &mut Channel<::Signer>, new_feerate: u32) -> NotifyOption { @@ -3742,7 +3991,7 @@ where /// it wants to detect). Thus, we have a variant exposed here for its benefit. pub fn maybe_update_chan_fees(&self) { PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = NotifyOption::SkipPersist; + let mut should_persist = self.process_background_events(); let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); @@ -3778,8 +4027,7 @@ where /// [`ChannelConfig`]: crate::util::config::ChannelConfig pub fn timer_tick_occurred(&self) { PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = NotifyOption::SkipPersist; - if self.process_background_events() { should_persist = NotifyOption::DoPersist; } + let mut should_persist = self.process_background_events(); let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); @@ -3843,6 +4091,20 @@ where chan.maybe_expire_prev_config(); + if chan.should_disconnect_peer_awaiting_response() { + log_debug!(self.logger, "Disconnecting peer {} due to not making any progress on channel {}", + counterparty_node_id, log_bytes!(*chan_id)); + pending_msg_events.push(MessageSendEvent::HandleError { + node_id: counterparty_node_id, + action: msgs::ErrorAction::DisconnectPeerWithWarning { + msg: msgs::WarningMessage { + channel_id: *chan_id, + data: "Disconnecting due to timeout awaiting response".to_owned(), + }, + }, + }); + } + true }); if peer_state.ok_to_remove(true) { @@ -3951,7 +4213,7 @@ where /// /// See [`FailureCode`] for valid failure codes. pub fn fail_htlc_backwards_with_reason(&self, payment_hash: &PaymentHash, failure_code: FailureCode) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let removed_source = self.claimable_payments.lock().unwrap().claimable_payments.remove(payment_hash); if let Some(payment) = removed_source { @@ -4098,10 +4360,10 @@ where mem::drop(forward_htlcs); if push_forward_ev { self.push_pending_forwards_ev(); } let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::HTLCHandlingFailed { + pending_events.push_back((events::Event::HTLCHandlingFailed { prev_channel_id: outpoint.to_channel_id(), failed_next_destination: destination, - }); + }, None)); }, } } @@ -4128,7 +4390,7 @@ where pub fn claim_funds(&self, payment_preimage: PaymentPreimage) { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut sources = { let mut claimable_payments = self.claimable_payments.lock().unwrap(); @@ -4333,16 +4595,16 @@ where Some(claimed_htlc_value - forwarded_htlc_value) } else { None }; - let prev_channel_id = Some(prev_outpoint.to_channel_id()); - let next_channel_id = Some(next_channel_id); - - Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded { - fee_earned_msat, - claim_from_onchain_tx: from_onchain, - prev_channel_id, - next_channel_id, - outbound_amount_forwarded_msat: forwarded_htlc_value_msat, - }}) + Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + event: events::Event::PaymentForwarded { + fee_earned_msat, + claim_from_onchain_tx: from_onchain, + prev_channel_id: Some(prev_outpoint.to_channel_id()), + next_channel_id: Some(next_channel_id), + outbound_amount_forwarded_msat: forwarded_htlc_value_msat, + }, + downstream_counterparty_and_funding_outpoint: None, + }) } else { None } }); if let Err((pk, err)) = res { @@ -4364,13 +4626,18 @@ where MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => { let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); if let Some(ClaimingPayment { amount_msat, payment_purpose: purpose, receiver_node_id }) = payment { - self.pending_events.lock().unwrap().push(events::Event::PaymentClaimed { + self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed { payment_hash, purpose, amount_msat, receiver_node_id: Some(receiver_node_id), - }); + }, None)); } }, - MonitorUpdateCompletionAction::EmitEvent { event } => { - self.pending_events.lock().unwrap().push(event); + MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + event, downstream_counterparty_and_funding_outpoint + } => { + self.pending_events.lock().unwrap().push_back((event, None)); + if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint { + self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker)); + } }, } } @@ -4439,7 +4706,7 @@ where if let Some(tx) = funding_broadcastable { log_info!(self.logger, "Broadcasting funding transaction with txid {}", tx.txid()); - self.tx_broadcaster.broadcast_transaction(&tx); + self.tx_broadcaster.broadcast_transactions(&[&tx]); } { @@ -4529,7 +4796,7 @@ where } fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u128) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let peers_without_funded_channels = self.peers_without_funded_channels(|peer| !peer.channel_by_id.is_empty()); let per_peer_state = self.per_peer_state.read().unwrap(); @@ -4694,15 +4961,13 @@ where }); } else { let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push( - events::Event::OpenChannelRequest { - temporary_channel_id: msg.temporary_channel_id.clone(), - counterparty_node_id: counterparty_node_id.clone(), - funding_satoshis: msg.funding_satoshis, - push_msat: msg.push_msat, - channel_type: channel.get_channel_type().clone(), - } - ); + pending_events.push_back((events::Event::OpenChannelRequest { + temporary_channel_id: msg.temporary_channel_id.clone(), + counterparty_node_id: counterparty_node_id.clone(), + funding_satoshis: msg.funding_satoshis, + push_msat: msg.push_msat, + channel_type: channel.get_channel_type().clone(), + }, None)); } entry.insert(channel); @@ -4730,13 +4995,13 @@ where } }; let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::FundingGenerationReady { + pending_events.push_back((events::Event::FundingGenerationReady { temporary_channel_id: msg.temporary_channel_id, counterparty_node_id: *counterparty_node_id, channel_value_satoshis: value, output_script, user_channel_id: user_id, - }); + }, None)); Ok(()) } @@ -4971,7 +5236,7 @@ where }; if let Some(broadcast_tx) = tx { log_info!(self.logger, "Broadcasting {}", log_tx!(broadcast_tx)); - self.tx_broadcaster.broadcast_transaction(&broadcast_tx); + self.tx_broadcaster.broadcast_transactions(&[&broadcast_tx]); } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { @@ -5110,11 +5375,13 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().get_funding_txo(); - let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan) + let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); + if let Some(monitor_update) = monitor_update_opt { + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, per_peer_state, chan) + } else { Ok(()) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5124,7 +5391,7 @@ where fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) { for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards { let mut push_forward_event = false; - let mut new_intercept_events = Vec::new(); + let mut new_intercept_events = VecDeque::new(); let mut failed_intercept_forwards = Vec::new(); if !pending_forwards.is_empty() { for (forward_info, prev_htlc_id) in pending_forwards.drain(..) { @@ -5151,13 +5418,13 @@ where let mut pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap(); match pending_intercepts.entry(intercept_id) { hash_map::Entry::Vacant(entry) => { - new_intercept_events.push(events::Event::HTLCIntercepted { + new_intercept_events.push_back((events::Event::HTLCIntercepted { requested_next_hop_scid: scid, payment_hash: forward_info.payment_hash, inbound_amount_msat: forward_info.incoming_amt_msat.unwrap(), expected_outbound_amount_msat: forward_info.outgoing_amt_msat, intercept_id - }); + }, None)); entry.insert(PendingAddHTLCInfo { prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info }); }, @@ -5207,14 +5474,32 @@ where fn push_pending_forwards_ev(&self) { let mut pending_events = self.pending_events.lock().unwrap(); let forward_ev_exists = pending_events.iter() - .find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) + .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) .is_some(); if !forward_ev_exists { - pending_events.push(events::Event::PendingHTLCsForwardable { + pending_events.push_back((events::Event::PendingHTLCsForwardable { time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), - }); - } + }, None)); + } + } + + /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote + /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event + /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of + /// the [`ChannelMonitorUpdate`] in question. + fn raa_monitor_updates_held(&self, + actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec>, + channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey + ) -> bool { + actions_blocking_raa_monitor_updates + .get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false) + || self.pending_events.lock().unwrap().iter().any(|(_, action)| { + action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate { + channel_funding_outpoint, + counterparty_node_id, + }) + }) } fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { @@ -5229,11 +5514,13 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().get_funding_txo(); - let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - let res = handle_new_monitor_update!(self, update_res, update_id, - peer_state_lock, peer_state, per_peer_state, chan); + let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let res = if let Some(monitor_update) = monitor_update_opt { + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, + peer_state_lock, peer_state, per_peer_state, chan) + } else { Ok(()) }; (htlcs_to_fail, res) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5474,13 +5761,8 @@ where /// update events as a separate process method here. #[cfg(fuzzing)] pub fn process_monitor_events(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - if self.process_pending_monitor_events() { - NotifyOption::DoPersist - } else { - NotifyOption::SkipPersist - } - }); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + self.process_pending_monitor_events(); } /// Check the holding cell in each channel and free any pending HTLCs in them if possible. @@ -5577,7 +5859,7 @@ where self.issue_channel_close_events(chan, ClosureReason::CooperativeClosure); log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); - self.tx_broadcaster.broadcast_transaction(&tx); + self.tx_broadcaster.broadcast_transactions(&[&tx]); update_maps_on_chan_removal!(self, chan); false } else { true } @@ -5612,12 +5894,15 @@ where // Channel::force_shutdown tries to make us do) as we may still be in initialization, // so we track the update internally and handle it when the user next calls // timer_tick_occurred, guaranteeing we're running normally. - if let Some((funding_txo, update)) = failure.0.take() { + if let Some((counterparty_node_id, funding_txo, update)) = failure.0.take() { assert_eq!(update.updates.len(), 1); if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] { assert!(should_broadcast); } else { unreachable!(); } - self.pending_background_events.lock().unwrap().push(BackgroundEvent::ClosingMonitorUpdate((funding_txo, update))); + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, funding_txo, update + }); } self.finish_force_close_channel(failure); } @@ -5632,7 +5917,7 @@ where let payment_secret = PaymentSecret(self.entropy_source.get_secure_random_bytes()); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut payment_secrets = self.pending_inbound_payments.lock().unwrap(); match payment_secrets.entry(payment_hash) { hash_map::Entry::Vacant(e) => { @@ -5788,7 +6073,7 @@ where /// Gets a fake short channel id for use in receiving [phantom node payments]. These fake scids /// are used when constructing the phantom invoice's route hints. /// - /// [phantom node payments]: crate::chain::keysinterface::PhantomKeysManager + /// [phantom node payments]: crate::sign::PhantomKeysManager pub fn get_phantom_scid(&self) -> u64 { let best_block_height = self.best_block.read().unwrap().height(); let short_to_chan_info = self.short_to_chan_info.read().unwrap(); @@ -5804,7 +6089,7 @@ where /// Gets route hints for use in receiving [phantom node payments]. /// - /// [phantom node payments]: crate::chain::keysinterface::PhantomKeysManager + /// [phantom node payments]: crate::sign::PhantomKeysManager pub fn get_phantom_route_hints(&self) -> PhantomRouteHints { PhantomRouteHints { channels: self.list_usable_channels(), @@ -5862,13 +6147,13 @@ where #[cfg(feature = "_test_utils")] pub fn push_pending_event(&self, event: events::Event) { let mut events = self.pending_events.lock().unwrap(); - events.push(event); + events.push_back((event, None)); } #[cfg(test)] pub fn pop_pending_event(&self) -> Option { let mut events = self.pending_events.lock().unwrap(); - if events.is_empty() { None } else { Some(events.remove(0)) } + events.pop_front().map(|(e, _)| e) } #[cfg(test)] @@ -5881,6 +6166,84 @@ where self.pending_outbound_payments.clear_pending_payments() } + /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an + /// [`Event`] being handled) completes, this should be called to restore the channel to normal + /// operation. It will double-check that nothing *else* is also blocking the same channel from + /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly. + fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option) { + let mut errors = Vec::new(); + loop { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lck = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lck; + + if let Some(blocker) = completed_blocker.take() { + // Only do this on the first iteration of the loop. + if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates + .get_mut(&channel_funding_outpoint.to_channel_id()) + { + blockers.retain(|iter| iter != &blocker); + } + } + + if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates, + channel_funding_outpoint, counterparty_node_id) { + // Check that, while holding the peer lock, we don't have anything else + // blocking monitor updates for this channel. If we do, release the monitor + // update(s) when those blockers complete. + log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first", + log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); + break; + } + + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) { + debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint); + if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { + log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", + log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); + let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update); + let update_id = monitor_update.update_id; + if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, + peer_state_lck, peer_state, per_peer_state, chan) + { + errors.push((e, counterparty_node_id)); + } + if further_update_exists { + // If there are more `ChannelMonitorUpdate`s to process, restart at the + // top of the loop. + continue; + } + } else { + log_trace!(self.logger, "Unlocked monitor updating for channel {} without monitors to update", + log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); + } + } + } else { + log_debug!(self.logger, + "Got a release post-RAA monitor update for peer {} but the channel is gone", + log_pubkey!(counterparty_node_id)); + } + break; + } + for (err, counterparty_node_id) in errors { + let res = Err::<(), _>(err); + let _ = handle_error!(self, res, counterparty_node_id); + } + } + + fn handle_post_event_actions(&self, actions: Vec) { + for action in actions { + match action { + EventCompletionAction::ReleaseRAAChannelMonitorUpdate { + channel_funding_outpoint, counterparty_node_id + } => { + self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None); + } + } + } + } + /// Processes any events asynchronously in the order they were generated since the last call /// using the given event handler. /// @@ -5920,7 +6283,7 @@ where fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut result = NotifyOption::SkipPersist; + let mut result = self.process_background_events(); // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -6001,7 +6364,8 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, + &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); @@ -6035,7 +6399,8 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, + &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger) .map(|(a, b)| (a, Vec::new(), b))); @@ -6054,8 +6419,8 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, + &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)); @@ -6098,7 +6463,8 @@ where } fn transaction_unconfirmed(&self, txid: &Txid) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, + &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.get_funding_txo() { if funding_txo.txid == *txid { @@ -6342,84 +6708,97 @@ where L::Target: Logger, { fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id); } + fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.temporary_channel_id.clone())), *counterparty_node_id); + } + fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); } + fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.temporary_channel_id.clone())), *counterparty_node_id); + } + fn handle_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_funding_created(counterparty_node_id, msg), *counterparty_node_id); } fn handle_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_funding_signed(counterparty_node_id, msg), *counterparty_node_id); } fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id); } fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_shutdown(counterparty_node_id, msg), *counterparty_node_id); } fn handle_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_closing_signed(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_fulfill_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_commitment_signed(counterparty_node_id, msg), *counterparty_node_id); } fn handle_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_revoke_and_ack(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id); } fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_announcement_signatures(counterparty_node_id, msg), *counterparty_node_id); } fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) { PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + let force_persist = self.process_background_events(); if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { - persist + if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist } } else { NotifyOption::SkipPersist } @@ -6427,12 +6806,12 @@ where } fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); } fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut failed_channels = Vec::new(); let mut per_peer_state = self.per_peer_state.write().unwrap(); let remove_peer = { @@ -6453,23 +6832,40 @@ where }); pending_msg_events.retain(|msg| { match msg { + // V1 Channel Establishment &events::MessageSendEvent::SendAcceptChannel { .. } => false, &events::MessageSendEvent::SendOpenChannel { .. } => false, &events::MessageSendEvent::SendFundingCreated { .. } => false, &events::MessageSendEvent::SendFundingSigned { .. } => false, + // V2 Channel Establishment + &events::MessageSendEvent::SendAcceptChannelV2 { .. } => false, + &events::MessageSendEvent::SendOpenChannelV2 { .. } => false, + // Common Channel Establishment &events::MessageSendEvent::SendChannelReady { .. } => false, &events::MessageSendEvent::SendAnnouncementSignatures { .. } => false, + // Interactive Transaction Construction + &events::MessageSendEvent::SendTxAddInput { .. } => false, + &events::MessageSendEvent::SendTxAddOutput { .. } => false, + &events::MessageSendEvent::SendTxRemoveInput { .. } => false, + &events::MessageSendEvent::SendTxRemoveOutput { .. } => false, + &events::MessageSendEvent::SendTxComplete { .. } => false, + &events::MessageSendEvent::SendTxSignatures { .. } => false, + &events::MessageSendEvent::SendTxInitRbf { .. } => false, + &events::MessageSendEvent::SendTxAckRbf { .. } => false, + &events::MessageSendEvent::SendTxAbort { .. } => false, + // Channel Operations &events::MessageSendEvent::UpdateHTLCs { .. } => false, &events::MessageSendEvent::SendRevokeAndACK { .. } => false, &events::MessageSendEvent::SendClosingSigned { .. } => false, &events::MessageSendEvent::SendShutdown { .. } => false, &events::MessageSendEvent::SendChannelReestablish { .. } => false, + &events::MessageSendEvent::HandleError { .. } => false, + // Gossip &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::SendChannelUpdate { .. } => false, - &events::MessageSendEvent::HandleError { .. } => false, &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, &events::MessageSendEvent::SendShortIdsQuery { .. } => false, &events::MessageSendEvent::SendReplyChannelRange { .. } => false, @@ -6497,7 +6893,7 @@ where return Err(()); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); // If we have too many peers connected which don't have funded channels, disconnect the // peer immediately (as long as it doesn't have funded channels). If we have a bunch of @@ -6518,6 +6914,7 @@ where latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: true, })); }, @@ -6580,7 +6977,7 @@ where } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); if msg.channel_id == [0; 32] { let channel_ids: Vec<[u8; 32]> = { @@ -6626,6 +7023,64 @@ where fn provided_init_features(&self, _their_init_features: &PublicKey) -> InitFeatures { provided_init_features(&self.default_configuration) } + + fn get_genesis_hashes(&self) -> Option> { + Some(vec![ChainHash::from(&self.genesis_hash[..])]) + } + + fn handle_tx_add_input(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxAddInput) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_add_output(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxAddOutput) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_remove_input(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxRemoveInput) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_remove_output(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxRemoveOutput) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_complete(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxComplete) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxSignatures) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_init_rbf(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxInitRbf) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_ack_rbf(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxAckRbf) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_abort(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxAbort) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } } /// Fetches the set of [`NodeFeatures`] flags which are provided by or required by @@ -6663,7 +7118,7 @@ pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { // should also add the corresponding (optional) bit to the [`ChannelMessageHandler`] impl for // [`ErroringMessageHandler`]. let mut features = InitFeatures::empty(); - features.set_data_loss_protect_optional(); + features.set_data_loss_protect_required(); features.set_upfront_shutdown_script_optional(); features.set_variable_length_onion_required(); features.set_static_remote_key_required(); @@ -6722,10 +7177,9 @@ impl Writeable for ChannelDetails { (14, user_channel_id_low, required), (16, self.balance_msat, required), (18, self.outbound_capacity_msat, required), - // Note that by the time we get past the required read above, outbound_capacity_msat will be - // filled in, so we can safely unwrap it here. - (19, self.next_outbound_htlc_limit_msat, (default_value, outbound_capacity_msat.0.unwrap() as u64)), + (19, self.next_outbound_htlc_limit_msat, required), (20, self.inbound_capacity_msat, required), + (21, self.next_outbound_htlc_minimum_msat, required), (22, self.confirmations_required, option), (24, self.force_close_spend_delay, option), (26, self.is_outbound, required), @@ -6762,6 +7216,7 @@ impl Readable for ChannelDetails { // filled in, so we can safely unwrap it here. (19, next_outbound_htlc_limit_msat, (default_value, outbound_capacity_msat.0.unwrap() as u64)), (20, inbound_capacity_msat, required), + (21, next_outbound_htlc_minimum_msat, (default_value, 0)), (22, confirmations_required, option), (24, force_close_spend_delay, option), (26, is_outbound, required), @@ -6795,6 +7250,7 @@ impl Readable for ChannelDetails { balance_msat: balance_msat.0.unwrap(), outbound_capacity_msat: outbound_capacity_msat.0.unwrap(), next_outbound_htlc_limit_msat: next_outbound_htlc_limit_msat.0.unwrap(), + next_outbound_htlc_minimum_msat: next_outbound_htlc_minimum_msat.0.unwrap(), inbound_capacity_msat: inbound_capacity_msat.0.unwrap(), confirmations_required, confirmations, @@ -7007,25 +7463,29 @@ impl Readable for HTLCSource { let mut path_hops: Option> = Some(Vec::new()); let mut payment_id = None; let mut payment_params: Option = None; + let mut blinded_tail: Option = None; read_tlv_fields!(reader, { (0, session_priv, required), (1, payment_id, option), (2, first_hop_htlc_msat, required), (4, path_hops, vec_type), (5, payment_params, (option: ReadableArgs, 0)), + (6, blinded_tail, option), }); if payment_id.is_none() { // For backwards compat, if there was no payment_id written, use the session_priv bytes // instead. payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref())); } - let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)? }; + let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)?, blinded_tail }; if path.hops.len() == 0 { return Err(DecodeError::InvalidValue); } if let Some(params) = payment_params.as_mut() { - if params.final_cltv_expiry_delta == 0 { - params.final_cltv_expiry_delta = path.final_cltv_expiry_delta(); + if let Payee::Clear { ref mut final_cltv_expiry_delta, .. } = params.payee { + if final_cltv_expiry_delta == &0 { + *final_cltv_expiry_delta = path.final_cltv_expiry_delta().ok_or(DecodeError::InvalidValue)?; + } } } Ok(HTLCSource::OutboundRoute { @@ -7054,6 +7514,7 @@ impl Writeable for HTLCSource { // 3 was previously used to write a PaymentSecret for the payment. (4, path.hops, vec_type), (5, None::, option), // payment_params in LDK versions prior to 0.0.115 + (6, path.blinded_tail, option), }); } HTLCSource::PreviousHopData(ref field) => { @@ -7202,23 +7663,28 @@ where } let events = self.pending_events.lock().unwrap(); - (events.len() as u64).write(writer)?; - for event in events.iter() { - event.write(writer)?; - } - - let background_events = self.pending_background_events.lock().unwrap(); - (background_events.len() as u64).write(writer)?; - for event in background_events.iter() { - match event { - BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update)) => { - 0u8.write(writer)?; - funding_txo.write(writer)?; - monitor_update.write(writer)?; - }, + // LDK versions prior to 0.0.115 don't support post-event actions, thus if there's no + // actions at all, skip writing the required TLV. Otherwise, pre-0.0.115 versions will + // refuse to read the new ChannelManager. + let events_not_backwards_compatible = events.iter().any(|(_, action)| action.is_some()); + if events_not_backwards_compatible { + // If we're gonna write a even TLV that will overwrite our events anyway we might as + // well save the space and not write any events here. + 0u64.write(writer)?; + } else { + (events.len() as u64).write(writer)?; + for (event, _) in events.iter() { + event.write(writer)?; } } + // LDK versions prior to 0.0.116 wrote the `pending_background_events` + // `MonitorUpdateRegeneratedOnStartup`s here, however there was never a reason to do so - + // the closing monitor updates were always effectively replayed on startup (either directly + // by calling `broadcast_latest_holder_commitment_txn` on a `ChannelMonitor` during + // deserialization or, in 0.0.115, by regenerating the monitor update itself). + 0u64.write(writer)?; + // Prior to 0.0.111 we tracked node_announcement serials here, however that now happens in // `PeerManager`, and thus we simply write the `highest_seen_timestamp` twice, which is // likely to be identical. @@ -7285,6 +7751,7 @@ where (5, self.our_network_pubkey, required), (6, monitor_update_blocked_actions_per_peer, option), (7, self.fake_scid_rand_bytes, required), + (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option), (9, htlc_purposes, vec_type), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), @@ -7294,6 +7761,47 @@ where } } +impl Writeable for VecDeque<(Event, Option)> { + fn write(&self, w: &mut W) -> Result<(), io::Error> { + (self.len() as u64).write(w)?; + for (event, action) in self.iter() { + event.write(w)?; + action.write(w)?; + #[cfg(debug_assertions)] { + // Events are MaybeReadable, in some cases indicating that they shouldn't actually + // be persisted and are regenerated on restart. However, if such an event has a + // post-event-handling action we'll write nothing for the event and would have to + // either forget the action or fail on deserialization (which we do below). Thus, + // check that the event is sane here. + let event_encoded = event.encode(); + let event_read: Option = + MaybeReadable::read(&mut &event_encoded[..]).unwrap(); + if action.is_some() { assert!(event_read.is_some()); } + } + } + Ok(()) + } +} +impl Readable for VecDeque<(Event, Option)> { + fn read(reader: &mut R) -> Result { + let len: u64 = Readable::read(reader)?; + const MAX_ALLOC_SIZE: u64 = 1024 * 16; + let mut events: Self = VecDeque::with_capacity(cmp::min( + MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option)>() as u64, + len) as usize); + for _ in 0..len { + let ev_opt = MaybeReadable::read(reader)?; + let action = Readable::read(reader)?; + if let Some(ev) = ev_opt { + events.push_back((ev, action)); + } else if action.is_some() { + return Err(DecodeError::InvalidValue); + } + } + Ok(events) + } +} + /// Arguments for the creation of a ChannelManager that are not deserialized. /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation @@ -7460,7 +7968,7 @@ where let mut peer_channels: HashMap::Signer>>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); - let mut channel_closures = Vec::new(); + let mut channel_closures = VecDeque::new(); let mut pending_background_events = Vec::new(); for _ in 0..channel_count { let mut channel: Channel<::Signer> = Channel::read(reader, ( @@ -7469,14 +7977,11 @@ where let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { - if channel.get_cur_holder_commitment_transaction_number() < monitor.get_cur_holder_commitment_number() || - channel.get_revoked_counterparty_commitment_transaction_number() < monitor.get_min_seen_secret() || - channel.get_cur_counterparty_commitment_transaction_number() < monitor.get_cur_counterparty_commitment_number() || - channel.get_latest_monitor_update_id() > monitor.get_latest_update_id() { + if channel.get_latest_complete_monitor_update_id() > monitor.get_latest_update_id() { // If the channel is ahead of the monitor, return InvalidValue: log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", - log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id()); + log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_complete_monitor_update_id()); log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); @@ -7492,15 +7997,17 @@ where log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id()); let (monitor_update, mut new_failed_htlcs) = channel.force_shutdown(true); - if let Some(monitor_update) = monitor_update { - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate(monitor_update)); + if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { + pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, funding_txo, update + }); } failed_htlcs.append(&mut new_failed_htlcs); - channel_closures.push(events::Event::ChannelClosed { + channel_closures.push_back((events::Event::ChannelClosed { channel_id: channel.channel_id(), user_channel_id: channel.get_user_id(), reason: ClosureReason::OutdatedChannelManager - }); + }, None)); for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() { let mut found_htlc = false; for (monitor_htlc_source, _) in monitor.get_all_current_outbound_htlcs() { @@ -7521,7 +8028,10 @@ where } } } else { - log_info!(args.logger, "Successfully loaded channel {}", log_bytes!(channel.channel_id())); + log_info!(args.logger, "Successfully loaded channel {} at update_id {} against monitor at update id {}", + log_bytes!(channel.channel_id()), channel.get_latest_monitor_update_id(), + monitor.get_latest_update_id()); + channel.complete_all_mon_updates_through(monitor.get_latest_update_id()); if let Some(short_channel_id) = channel.get_short_channel_id() { short_to_chan_info.insert(short_channel_id, (channel.get_counterparty_node_id(), channel.channel_id())); } @@ -7545,11 +8055,11 @@ where // was in-progress, we never broadcasted the funding transaction and can still // safely discard the channel. let _ = channel.force_shutdown(false); - channel_closures.push(events::Event::ChannelClosed { + channel_closures.push_back((events::Event::ChannelClosed { channel_id: channel.channel_id(), user_channel_id: channel.get_user_id(), reason: ClosureReason::DisconnectedPeer, - }); + }, None)); } else { log_error!(args.logger, "Missing ChannelMonitor for channel {} needed by ChannelManager.", log_bytes!(channel.channel_id())); log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); @@ -7562,11 +8072,13 @@ where for (funding_txo, _) in args.channel_monitors.iter() { if !funding_txo_set.contains(funding_txo) { + log_info!(args.logger, "Queueing monitor update to ensure missing channel {} is force closed", + log_bytes!(funding_txo.to_channel_id())); let monitor_update = ChannelMonitorUpdate { update_id: CLOSED_CHANNEL_UPDATE_ID, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }], }; - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate((*funding_txo, monitor_update))); + pending_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); } } @@ -7604,16 +8116,18 @@ where latest_features: Readable::read(reader)?, pending_msg_events: Vec::new(), monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: false, }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } let event_count: u64 = Readable::read(reader)?; - let mut pending_events_read: Vec = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::())); + let mut pending_events_read: VecDeque<(events::Event, Option)> = + VecDeque::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option)>())); for _ in 0..event_count { match MaybeReadable::read(reader)? { - Some(event) => pending_events_read.push(event), + Some(event) => pending_events_read.push_back((event, None)), None => continue, } } @@ -7622,18 +8136,34 @@ where for _ in 0..background_event_count { match ::read(reader)? { 0 => { - let (funding_txo, monitor_update): (OutPoint, ChannelMonitorUpdate) = (Readable::read(reader)?, Readable::read(reader)?); - if pending_background_events.iter().find(|e| { - let BackgroundEvent::ClosingMonitorUpdate((pending_funding_txo, pending_monitor_update)) = e; - *pending_funding_txo == funding_txo && *pending_monitor_update == monitor_update - }).is_none() { - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update))); - } + // LDK versions prior to 0.0.116 wrote pending `MonitorUpdateRegeneratedOnStartup`s here, + // however we really don't (and never did) need them - we regenerate all + // on-startup monitor updates. + let _: OutPoint = Readable::read(reader)?; + let _: ChannelMonitorUpdate = Readable::read(reader)?; } _ => return Err(DecodeError::InvalidValue), } } + for (node_id, peer_mtx) in per_peer_state.iter() { + let peer_state = peer_mtx.lock().unwrap(); + for (_, chan) in peer_state.channel_by_id.iter() { + for update in chan.uncompleted_unblocked_mon_updates() { + if let Some(funding_txo) = chan.get_funding_txo() { + log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}", + update.update_id, log_bytes!(funding_txo.to_channel_id())); + pending_background_events.push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: *node_id, funding_txo, update: update.clone(), + }); + } else { + return Err(DecodeError::InvalidValue); + } + } + } + } + let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111 let highest_seen_timestamp: u32 = Readable::read(reader)?; @@ -7668,7 +8198,8 @@ where let mut claimable_htlc_purposes = None; let mut claimable_htlc_onion_fields = None; let mut pending_claiming_payments = Some(HashMap::new()); - let mut monitor_update_blocked_actions_per_peer = Some(Vec::new()); + let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); + let mut events_override = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -7677,6 +8208,7 @@ where (5, received_network_pubkey, option), (6, monitor_update_blocked_actions_per_peer, option), (7, fake_scid_rand_bytes, option), + (8, events_override, option), (9, claimable_htlc_purposes, vec_type), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), @@ -7689,6 +8221,10 @@ where probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes()); } + if let Some(events) = events_override { + pending_events_read = events; + } + if !channel_closures.is_empty() { pending_events_read.append(&mut channel_closures); } @@ -7784,7 +8320,7 @@ where if pending_forward_matches_htlc(&htlc_info) { log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}", log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); - pending_events_read.retain(|event| { + pending_events_read.retain(|(event, _)| { if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event { intercepted_id != ev_id } else { true } @@ -7820,9 +8356,9 @@ where // shut down before the timer hit. Either way, set the time_forwardable to a small // constant as enough time has likely passed that we should simply handle the forwards // now, or at least after the user gets a chance to reconnect to our peers. - pending_events_read.push(events::Event::PendingHTLCsForwardable { + pending_events_read.push_back((events::Event::PendingHTLCsForwardable { time_forwardable: Duration::from_secs(2), - }); + }, None)); } let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material(); @@ -7976,18 +8512,32 @@ where previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &args.tx_broadcaster, &bounded_fee_estimator, &args.logger); } } - pending_events_read.push(events::Event::PaymentClaimed { + pending_events_read.push_back((events::Event::PaymentClaimed { receiver_node_id, payment_hash, purpose: payment.purpose, amount_msat: claimable_amt_msat, - }); + }, None)); } } } for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() { - if let Some(peer_state) = per_peer_state.get_mut(&node_id) { + if let Some(peer_state) = per_peer_state.get(&node_id) { + for (_, actions) in monitor_update_blocked_actions.iter() { + for action in actions.iter() { + if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + downstream_counterparty_and_funding_outpoint: + Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), .. + } = action { + if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) { + blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates + .entry(blocked_channel_outpoint.to_channel_id()) + .or_insert_with(Vec::new).push(blocking_action.clone()); + } + } + } + } peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions; } else { log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id); @@ -8026,8 +8576,11 @@ where per_peer_state: FairRwLock::new(per_peer_state), pending_events: Mutex::new(pending_events_read), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), + #[cfg(debug_assertions)] + background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), entropy_source: args.entropy_source, @@ -8057,8 +8610,6 @@ mod tests { use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; - #[cfg(feature = "std")] - use core::time::Duration; use core::sync::atomic::Ordering; use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; @@ -8069,8 +8620,8 @@ mod tests { use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; use crate::util::test_utils; - use crate::util::config::ChannelConfig; - use crate::chain::keysinterface::EntropySource; + use crate::util::config::{ChannelConfig, ChannelConfigUpdate}; + use crate::sign::EntropySource; #[test] fn test_notify_limits() { @@ -8301,7 +8852,7 @@ mod tests { }; let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &random_seed_bytes + None, nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -8335,7 +8886,7 @@ mod tests { let payment_preimage = PaymentPreimage([42; 32]); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &random_seed_bytes + None, nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -8398,7 +8949,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &random_seed_bytes + nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]); @@ -8442,7 +8993,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &random_seed_bytes + nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]); @@ -8788,12 +9339,14 @@ mod tests { &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap()); peer_pks.push(random_pk); nodes[1].node.peer_connected(&random_pk, &msgs::Init { - features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); } let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx, &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap()); nodes[1].node.peer_connected(&last_random_pk, &msgs::Init { - features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err(); + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap_err(); // Also importantly, because nodes[0] isn't "protected", we will refuse a reconnection from // them if we have too many un-channel'd peers. @@ -8804,13 +9357,16 @@ mod tests { if let Event::ChannelClosed { .. } = ev { } else { panic!(); } } nodes[1].node.peer_connected(&last_random_pk, &msgs::Init { - features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { - features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err(); + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap_err(); // but of course if the connection is outbound its allowed... nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { - features: nodes[0].node.init_features(), remote_network_address: None }, false).unwrap(); + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, false).unwrap(); nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id()); // Now nodes[0] is disconnected but still has a pending, un-funded channel lying around. @@ -8834,7 +9390,8 @@ mod tests { // "protected" and can connect again. mine_transaction(&nodes[1], funding_tx.as_ref().unwrap()); nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { - features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id()); // Further, because the first channel was funded, we can open another channel with @@ -8899,7 +9456,8 @@ mod tests { let random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx, &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap()); nodes[1].node.peer_connected(&random_pk, &msgs::Init { - features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); nodes[1].node.handle_open_channel(&random_pk, &open_channel_msg); let events = nodes[1].node.get_and_clear_pending_events(); @@ -8917,7 +9475,8 @@ mod tests { let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx, &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap()); nodes[1].node.peer_connected(&last_random_pk, &msgs::Init { - features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap(); + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg); let events = nodes[1].node.get_and_clear_pending_events(); match events[0] { @@ -8980,13 +9539,69 @@ mod tests { check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed); } + + #[test] + fn test_update_channel_config() { + let chanmon_cfg = create_chanmon_cfgs(2); + let node_cfg = create_node_cfgs(2, &chanmon_cfg); + let mut user_config = test_default_channel_config(); + let node_chanmgr = create_node_chanmgrs(2, &node_cfg, &[Some(user_config), Some(user_config)]); + let nodes = create_network(2, &node_cfg, &node_chanmgr); + let _ = create_announced_chan_between_nodes(&nodes, 0, 1); + let channel = &nodes[0].node.list_channels()[0]; + + nodes[0].node.update_channel_config(&channel.counterparty.node_id, &[channel.channel_id], &user_config.channel_config).unwrap(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 0); + + user_config.channel_config.forwarding_fee_base_msat += 10; + nodes[0].node.update_channel_config(&channel.counterparty.node_id, &[channel.channel_id], &user_config.channel_config).unwrap(); + assert_eq!(nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_base_msat, user_config.channel_config.forwarding_fee_base_msat); + let events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, + _ => panic!("expected BroadcastChannelUpdate event"), + } + + nodes[0].node.update_partial_channel_config(&channel.counterparty.node_id, &[channel.channel_id], &ChannelConfigUpdate::default()).unwrap(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 0); + + let new_cltv_expiry_delta = user_config.channel_config.cltv_expiry_delta + 6; + nodes[0].node.update_partial_channel_config(&channel.counterparty.node_id, &[channel.channel_id], &ChannelConfigUpdate { + cltv_expiry_delta: Some(new_cltv_expiry_delta), + ..Default::default() + }).unwrap(); + assert_eq!(nodes[0].node.list_channels()[0].config.unwrap().cltv_expiry_delta, new_cltv_expiry_delta); + let events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, + _ => panic!("expected BroadcastChannelUpdate event"), + } + + let new_fee = user_config.channel_config.forwarding_fee_proportional_millionths + 100; + nodes[0].node.update_partial_channel_config(&channel.counterparty.node_id, &[channel.channel_id], &ChannelConfigUpdate { + forwarding_fee_proportional_millionths: Some(new_fee), + ..Default::default() + }).unwrap(); + assert_eq!(nodes[0].node.list_channels()[0].config.unwrap().cltv_expiry_delta, new_cltv_expiry_delta); + assert_eq!(nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_proportional_millionths, new_fee); + let events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, + _ => panic!("expected BroadcastChannelUpdate event"), + } + } } -#[cfg(all(any(test, feature = "_test_utils"), feature = "_bench_unstable"))] +#[cfg(ldk_bench)] pub mod bench { use crate::chain::Listen; use crate::chain::chainmonitor::{ChainMonitor, Persist}; - use crate::chain::keysinterface::{KeysManager, InMemorySigner}; + use crate::sign::{KeysManager, InMemorySigner}; use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use crate::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentId, RecipientOnionFields, Retry}; use crate::ln::functional_test_utils::*; @@ -9002,7 +9617,7 @@ pub mod bench { use crate::sync::{Arc, Mutex}; - use test::Bencher; + use criterion::Criterion; type Manager<'a, P> = ChannelManager< &'a ChainMonitor Option<&test_utils::TestChainMonitor> { None } } - #[cfg(test)] - #[bench] - fn bench_sends(bench: &mut Bencher) { - bench_two_sends(bench, test_utils::TestPersister::new(), test_utils::TestPersister::new()); + pub fn bench_sends(bench: &mut Criterion) { + bench_two_sends(bench, "bench_sends", test_utils::TestPersister::new(), test_utils::TestPersister::new()); } - pub fn bench_two_sends>(bench: &mut Bencher, persister_a: P, persister_b: P) { + pub fn bench_two_sends>(bench: &mut Criterion, bench_name: &str, persister_a: P, persister_b: P) { // Do a simple benchmark of sending a payment back and forth between two nodes. // Note that this is unrealistic as each payment send will require at least two fsync // calls per node. let network = bitcoin::Network::Testnet; - let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}; + let tx_broadcaster = test_utils::TestBroadcaster::new(network); let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; let logger_a = test_utils::TestLogger::with_id("node a".to_owned()); let scorer = Mutex::new(test_utils::TestScorer::new()); @@ -9063,8 +9676,12 @@ pub mod bench { }); let node_b_holder = ANodeHolder { node: &node_b }; - node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }, true).unwrap(); - node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }, false).unwrap(); + node_a.peer_connected(&node_b.get_our_node_id(), &Init { + features: node_b.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + node_b.peer_connected(&node_a.get_our_node_id(), &Init { + features: node_a.init_features(), networks: None, remote_network_address: None + }, false).unwrap(); node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap(); node_b.handle_open_channel(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id())); node_a.handle_accept_channel(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id())); @@ -9099,10 +9716,7 @@ pub mod bench { assert_eq!(&tx_broadcaster.txn_broadcasted.lock().unwrap()[..], &[tx.clone()]); - let block = Block { - header: BlockHeader { version: 0x20000000, prev_blockhash: BestBlock::from_network(network).block_hash(), merkle_root: TxMerkleNode::all_zeros(), time: 42, bits: 42, nonce: 42 }, - txdata: vec![tx], - }; + let block = create_dummy_block(BestBlock::from_network(network).block_hash(), 42, vec![tx]); Listen::block_connected(&node_a, &block, 1); Listen::block_connected(&node_b, &block, 1); @@ -9143,7 +9757,7 @@ pub mod bench { macro_rules! send_payment { ($node_a: expr, $node_b: expr) => { let payment_params = PaymentParameters::from_node_id($node_b.get_our_node_id(), TEST_FINAL_CLTV) - .with_features($node_b.invoice_features()); + .with_bolt11_features($node_b.invoice_features()).unwrap(); let mut payment_preimage = PaymentPreimage([0; 32]); payment_preimage.0[0..8].copy_from_slice(&payment_count.to_le_bytes()); payment_count += 1; @@ -9185,9 +9799,9 @@ pub mod bench { } } - bench.iter(|| { + bench.bench_function(bench_name, |b| b.iter(|| { send_payment!(node_a, node_b); send_payment!(node_b, node_a); - }); + })); } }