From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Sat, 15 May 2021 00:44:40 +0000 (+0000) Subject: Merge pull request #916 from TheBlueMatt/2021-05-fix-disabled-announcements X-Git-Tag: v0.0.98~25 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=e0986de47796848efa1619624dde0fe6e9908d81;hp=eeabac8ccb6d9339228910145711bd5510bd0c4b;p=rust-lightning Merge pull request #916 from TheBlueMatt/2021-05-fix-disabled-announcements Avoid persisting a ChannelManager after each timer tick and send update_channel re-enable messages --- diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 413dc831..a2137548 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -248,18 +248,21 @@ const MULTI_STATE_FLAGS: u32 = BOTH_SIDES_SHUTDOWN_MASK | ChannelState::PeerDisc pub const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1; -/// Liveness is called to fluctuate given peer disconnecton/monitor failures/closing. -/// If channel is public, network should have a liveness view announced by us on a -/// best-effort, which means we may filter out some status transitions to avoid spam. -/// See further timer_tick_occurred. -#[derive(PartialEq)] -enum UpdateStatus { - /// Status has been gossiped. - Fresh, - /// Status has been changed. - DisabledMarked, - /// Status has been marked to be gossiped at next flush +/// The "channel disabled" bit in channel_update must be set based on whether we are connected to +/// our counterparty or not. However, we don't want to announce updates right away to avoid +/// spamming the network with updates if the connection is flapping. Instead, we "stage" updates to +/// our channel_update message and track the current state here. +/// See implementation at [`super::channelmanager::ChannelManager::timer_tick_occurred`]. +#[derive(Clone, Copy, PartialEq)] +pub(super) enum ChannelUpdateStatus { + /// We've announced the channel as enabled and are connected to our peer. + Enabled, + /// Our channel is no longer live, but we haven't announced the channel as disabled yet. DisabledStaged, + /// Our channel is live again, but we haven't announced the channel as enabled yet. + EnabledStaged, + /// We've announced the channel as disabled. + Disabled, } /// An enum indicating whether the local or remote side offered a given HTLC. @@ -416,7 +419,7 @@ pub(super) struct Channel { commitment_secrets: CounterpartyCommitmentSecrets, - network_sync: UpdateStatus, + channel_update_status: ChannelUpdateStatus, // We save these values so we can make sure `next_local_commit_tx_fee_msat` and // `next_remote_commit_tx_fee_msat` properly predict what the next commitment transaction fee will @@ -617,7 +620,7 @@ impl Channel { commitment_secrets: CounterpartyCommitmentSecrets::new(), - network_sync: UpdateStatus::Fresh, + channel_update_status: ChannelUpdateStatus::Enabled, #[cfg(any(test, feature = "fuzztarget"))] next_local_commitment_tx_fee_info_cached: Mutex::new(None), @@ -858,7 +861,7 @@ impl Channel { commitment_secrets: CounterpartyCommitmentSecrets::new(), - network_sync: UpdateStatus::Fresh, + channel_update_status: ChannelUpdateStatus::Enabled, #[cfg(any(test, feature = "fuzztarget"))] next_local_commitment_tx_fee_info_cached: Mutex::new(None), @@ -3495,24 +3498,12 @@ impl Channel { } else { false } } - pub fn to_disabled_staged(&mut self) { - self.network_sync = UpdateStatus::DisabledStaged; + pub fn channel_update_status(&self) -> ChannelUpdateStatus { + self.channel_update_status } - pub fn to_disabled_marked(&mut self) { - self.network_sync = UpdateStatus::DisabledMarked; - } - - pub fn to_fresh(&mut self) { - self.network_sync = UpdateStatus::Fresh; - } - - pub fn is_disabled_staged(&self) -> bool { - self.network_sync == UpdateStatus::DisabledStaged - } - - pub fn is_disabled_marked(&self) -> bool { - self.network_sync == UpdateStatus::DisabledMarked + pub fn set_channel_update_status(&mut self, status: ChannelUpdateStatus) { + self.channel_update_status = status; } fn check_get_funding_locked(&mut self, height: u32) -> Option { @@ -4375,6 +4366,31 @@ impl Readable for InboundHTLCRemovalReason { } } +impl Writeable for ChannelUpdateStatus { + fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + // We only care about writing out the current state as it was announced, ie only either + // Enabled or Disabled. In the case of DisabledStaged, we most recently announced the + // channel as enabled, so we write 0. For EnabledStaged, we similarly write a 1. + match self { + ChannelUpdateStatus::Enabled => 0u8.write(writer)?, + ChannelUpdateStatus::DisabledStaged => 0u8.write(writer)?, + ChannelUpdateStatus::EnabledStaged => 1u8.write(writer)?, + ChannelUpdateStatus::Disabled => 1u8.write(writer)?, + } + Ok(()) + } +} + +impl Readable for ChannelUpdateStatus { + fn read(reader: &mut R) -> Result { + Ok(match ::read(reader)? { + 0 => ChannelUpdateStatus::Enabled, + 1 => ChannelUpdateStatus::Disabled, + _ => return Err(DecodeError::InvalidValue), + }) + } +} + impl Writeable for Channel { fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { // Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been @@ -4568,6 +4584,8 @@ impl Writeable for Channel { self.counterparty_shutdown_scriptpubkey.write(writer)?; self.commitment_secrets.write(writer)?; + + self.channel_update_status.write(writer)?; Ok(()) } } @@ -4740,6 +4758,8 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel let counterparty_shutdown_scriptpubkey = Readable::read(reader)?; let commitment_secrets = Readable::read(reader)?; + let channel_update_status = Readable::read(reader)?; + let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&keys_source.get_secure_random_bytes()); @@ -4814,7 +4834,7 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel commitment_secrets, - network_sync: UpdateStatus::Fresh, + channel_update_status, #[cfg(any(test, feature = "fuzztarget"))] next_local_commitment_tx_fee_info_cached: Mutex::new(None), diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index a0168bdc..54bd3c89 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -45,7 +45,7 @@ use chain::transaction::{OutPoint, TransactionData}; // construct one themselves. use ln::{PaymentHash, PaymentPreimage, PaymentSecret}; pub use ln::channel::CounterpartyForwardingInfo; -use ln::channel::{Channel, ChannelError}; +use ln::channel::{Channel, ChannelError, ChannelUpdateStatus}; use ln::features::{InitFeatures, NodeFeatures}; use routing::router::{Route, RouteHop}; use ln::msgs; @@ -468,8 +468,8 @@ pub struct ChannelManager, persistence_notifier: PersistenceNotifier, @@ -522,32 +522,50 @@ impl BestBlock { pub fn height(&self) -> u32 { self.height } } +#[derive(Copy, Clone, PartialEq)] +enum NotifyOption { + DoPersist, + SkipPersist, +} + /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is /// desirable to notify any listeners on `await_persistable_update_timeout`/ -/// `await_persistable_update` that new updates are available for persistence. Therefore, this +/// `await_persistable_update` when new updates are available for persistence. Therefore, this /// struct is responsible for locking the total consistency lock and, upon going out of scope, /// sending the aforementioned notification (since the lock being released indicates that the /// updates are ready for persistence). -struct PersistenceNotifierGuard<'a> { +/// +/// We allow callers to either always notify by constructing with `notify_on_drop` or choose to +/// notify or not based on whether relevant changes have been made, providing a closure to +/// `optionally_notify` which returns a `NotifyOption`. +struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { persistence_notifier: &'a PersistenceNotifier, + should_persist: F, // We hold onto this result so the lock doesn't get released immediately. _read_guard: RwLockReadGuard<'a, ()>, } -impl<'a> PersistenceNotifierGuard<'a> { - fn new(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> Self { +impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused + fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + PersistenceNotifierGuard::optionally_notify(lock, notifier, || -> NotifyOption { NotifyOption::DoPersist }) + } + + fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { let read_guard = lock.read().unwrap(); - Self { + PersistenceNotifierGuard { persistence_notifier: notifier, + should_persist: persist_check, _read_guard: read_guard, } } } -impl<'a> Drop for PersistenceNotifierGuard<'a> { +impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { - self.persistence_notifier.notify(); + if (self.should_persist)() == NotifyOption::DoPersist { + self.persistence_notifier.notify(); + } } } @@ -943,7 +961,7 @@ impl ChannelMana let channel = Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, config)?; let res = channel.get_open_channel(self.genesis_hash.clone()); - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); // We want to make sure the lock is actually acquired by PersistenceNotifierGuard. debug_assert!(&self.total_consistency_lock.try_write().is_err()); @@ -1024,7 +1042,7 @@ impl ChannelMana /// /// May generate a SendShutdown message event on success, which should be relayed. pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let (mut failed_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); @@ -1114,7 +1132,7 @@ impl ChannelMana /// Force closes a channel, immediately broadcasting the latest local commitment transaction to /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager. pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); match self.force_close_channel_with_peer(channel_id, None) { Ok(counterparty_node_id) => { self.channel_state.lock().unwrap().pending_msg_events.push( @@ -1459,7 +1477,7 @@ impl ChannelMana } let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash); - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let err: Result<(), _> = loop { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -1686,7 +1704,7 @@ impl ChannelMana /// not currently support replacing a funding transaction on an existing channel. Instead, /// create a new channel with a conflicting funding transaction. pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); for inp in funding_transaction.input.iter() { if inp.witness.is_empty() { @@ -1769,7 +1787,7 @@ impl ChannelMana /// /// Panics if addresses is absurdly large (more than 500). pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], mut addresses: Vec) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); if addresses.len() > 500 { panic!("More than half the message size was taken up by public addresses!"); @@ -1803,7 +1821,7 @@ impl ChannelMana /// Should only really ever be called in response to a PendingHTLCsForwardable event. /// Will likely generate further events. pub fn process_pending_htlc_forwards(&self) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut new_events = Vec::new(); let mut failed_forwards = Vec::new(); @@ -2094,9 +2112,13 @@ impl ChannelMana /// BroadcastChannelUpdate events in timer_tick_occurred. /// /// Expects the caller to have a total_consistency_lock read lock. - fn process_background_events(&self) { + fn process_background_events(&self) -> bool { let mut background_events = Vec::new(); mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events); + if background_events.is_empty() { + return false; + } + for event in background_events.drain(..) { match event { BackgroundEvent::ClosingMonitorUpdate((funding_txo, update)) => { @@ -2106,6 +2128,7 @@ impl ChannelMana }, } } + true } #[cfg(any(test, feature = "_test_utils"))] @@ -2121,25 +2144,42 @@ impl ChannelMana /// /// Note that in some rare cases this may generate a `chain::Watch::update_channel` call. pub fn timer_tick_occurred(&self) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); - self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + let mut should_persist = NotifyOption::SkipPersist; + if self.process_background_events() { should_persist = NotifyOption::DoPersist; } - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; - for (_, chan) in channel_state.by_id.iter_mut() { - if chan.is_disabled_staged() && !chan.is_live() { - if let Ok(update) = self.get_channel_update(&chan) { - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_state_lock; + for (_, chan) in channel_state.by_id.iter_mut() { + match chan.channel_update_status() { + ChannelUpdateStatus::Enabled if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged), + ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged), + ChannelUpdateStatus::DisabledStaged if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Enabled), + ChannelUpdateStatus::EnabledStaged if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Disabled), + ChannelUpdateStatus::DisabledStaged if !chan.is_live() => { + if let Ok(update) = self.get_channel_update(&chan) { + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + chan.set_channel_update_status(ChannelUpdateStatus::Disabled); + }, + ChannelUpdateStatus::EnabledStaged if chan.is_live() => { + if let Ok(update) = self.get_channel_update(&chan) { + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + chan.set_channel_update_status(ChannelUpdateStatus::Enabled); + }, + _ => {}, } - chan.to_fresh(); - } else if chan.is_disabled_staged() && chan.is_live() { - chan.to_fresh(); - } else if chan.is_disabled_marked() { - chan.to_disabled_staged(); } - } + + should_persist + }); } /// Indicates that the preimage for payment_hash is unknown or the received amount is incorrect @@ -2148,7 +2188,7 @@ impl ChannelMana /// Returns false if no payment was found to fail backwards, true if the process of failing the /// HTLC backwards has been started. pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash) -> bool { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(payment_hash); @@ -2328,7 +2368,7 @@ impl ChannelMana pub fn claim_funds(&self, payment_preimage: PaymentPreimage) -> bool { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&payment_hash); @@ -2512,7 +2552,7 @@ impl ChannelMana /// 4) once all remote copies are updated, you call this function with the update_id that /// completed, and once it is the latest the Channel will be re-enabled. pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut close_results = Vec::new(); let mut htlc_forwards = Vec::new(); @@ -3283,7 +3323,7 @@ impl ChannelMana /// (C-not exported) Cause its doc(hidden) anyway #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u32) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let counterparty_node_id; let err: Result<(), _> = loop { let mut channel_state_lock = self.channel_state.lock().unwrap(); @@ -3407,7 +3447,7 @@ impl ChannelMana let payment_secret = PaymentSecret(self.keys_manager.get_secure_random_bytes()); - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut payment_secrets = self.pending_inbound_payments.lock().unwrap(); match payment_secrets.entry(payment_hash) { hash_map::Entry::Vacant(e) => { @@ -3564,7 +3604,7 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); @@ -3595,7 +3635,7 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, &self.logger).map(|a| (a, Vec::new()))); } @@ -3607,7 +3647,7 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); @@ -3649,7 +3689,7 @@ where } fn transaction_unconfirmed(&self, txid: &Txid) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.get_funding_txo() { if funding_txo.txid == *txid { @@ -3795,92 +3835,92 @@ impl L::Target: Logger, { fn handle_open_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, their_features, msg), *counterparty_node_id); } fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, their_features, msg), *counterparty_node_id); } fn handle_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_funding_created(counterparty_node_id, msg), *counterparty_node_id); } fn handle_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_funding_signed(counterparty_node_id, msg), *counterparty_node_id); } fn handle_funding_locked(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingLocked) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_funding_locked(counterparty_node_id, msg), *counterparty_node_id); } fn handle_shutdown(&self, counterparty_node_id: &PublicKey, their_features: &InitFeatures, msg: &msgs::Shutdown) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_shutdown(counterparty_node_id, their_features, msg), *counterparty_node_id); } fn handle_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_closing_signed(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_update_fulfill_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_commitment_signed(counterparty_node_id, msg), *counterparty_node_id); } fn handle_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_revoke_and_ack(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id); } fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_announcement_signatures(counterparty_node_id, msg), *counterparty_node_id); } fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id); } fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); } fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_channels = Vec::new(); let mut failed_payments = Vec::new(); let mut no_channels_remain = true; @@ -3916,7 +3956,6 @@ impl // on peer disconnect here, there will need to be corresponding changes in // reestablish logic. let failed_adds = chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger); - chan.to_disabled_marked(); if !failed_adds.is_empty() { let chan_update = self.get_channel_update(&chan).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe failed_payments.push((chan_update, failed_adds)); @@ -3974,7 +4013,7 @@ impl fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init) { log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); { let mut peer_state_lock = self.per_peer_state.write().unwrap(); @@ -4014,7 +4053,7 @@ impl } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); if msg.channel_id == [0; 32] { for chan in self.list_channels() { @@ -4049,6 +4088,10 @@ impl PersistenceNotifier { loop { let &(ref mtx, ref cvar) = &self.persistence_lock; let mut guard = mtx.lock().unwrap(); + if *guard { + *guard = false; + return; + } guard = cvar.wait(guard).unwrap(); let result = *guard; if result { @@ -4064,6 +4107,10 @@ impl PersistenceNotifier { loop { let &(ref mtx, ref cvar) = &self.persistence_lock; let mut guard = mtx.lock().unwrap(); + if *guard { + *guard = false; + return true; + } guard = cvar.wait_timeout(guard, max_wait).unwrap().0; // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the // desired wait time has actually passed, and if not then restart the loop with a reduced wait diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 4a94b6c6..10270e68 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -7582,16 +7582,17 @@ fn test_announce_disable_channels() { nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); - nodes[0].node.timer_tick_occurred(); // dirty -> stagged - nodes[0].node.timer_tick_occurred(); // staged -> fresh + nodes[0].node.timer_tick_occurred(); // Enabled -> DisabledStaged + nodes[0].node.timer_tick_occurred(); // DisabledStaged -> Disabled let msg_events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(msg_events.len(), 3); + let mut chans_disabled: HashSet = [short_id_1, short_id_2, short_id_3].iter().map(|a| *a).collect(); for e in msg_events { match e { MessageSendEvent::BroadcastChannelUpdate { ref msg } => { - let short_id = msg.contents.short_channel_id; - // Check generated channel_update match list in PendingChannelUpdate - if short_id != short_id_1 && short_id != short_id_2 && short_id != short_id_3 { + assert_eq!(msg.contents.flags & (1<<1), 1<<1); // The "channel disabled" bit should be set + // Check that each channel gets updated exactly once + if !chans_disabled.remove(&msg.contents.short_channel_id) { panic!("Generated ChannelUpdate for wrong chan!"); } }, @@ -7624,6 +7625,22 @@ fn test_announce_disable_channels() { nodes[0].node.timer_tick_occurred(); assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + nodes[0].node.timer_tick_occurred(); + let msg_events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(msg_events.len(), 3); + chans_disabled = [short_id_1, short_id_2, short_id_3].iter().map(|a| *a).collect(); + for e in msg_events { + match e { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { + assert_eq!(msg.contents.flags & (1<<1), 0); // The "channel disabled" bit should be off + // Check that each channel gets updated exactly once + if !chans_disabled.remove(&msg.contents.short_channel_id) { + panic!("Generated ChannelUpdate for wrong chan!"); + } + }, + _ => panic!("Unexpected event"), + } + } } #[test]