use ln::features::{ChannelTypeFeatures, InitFeatures, NodeFeatures};
use routing::router::{PaymentParameters, Route, RouteHop, RoutePath, RouteParameters};
use ln::msgs;
-use ln::msgs::NetAddress;
use ln::onion_utils;
use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VALUE_MSAT};
use ln::wire::Encode;
use util::config::{UserConfig, ChannelConfig};
use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
use util::{byte_utils, events};
+use util::wakers::{Future, Notifier};
use util::scid_utils::fake_scid;
use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
use util::logger::{Level, Logger};
use core::{cmp, mem};
use core::cell::RefCell;
use io::Read;
-use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
+use sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::time::Duration;
use core::ops::Deref;
-#[cfg(any(test, feature = "std"))]
-use std::time::Instant;
-use util::crypto::sign;
-
// We hold various information about HTLC relay in the HTLC objects in Channel itself:
//
// Upon receipt of an HTLC from a peer, we'll give it a PendingHTLCStatus indicating if it should
/// keeping additional state.
probing_cookie_secret: [u8; 32],
- /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
- /// value increases strictly since we don't assume access to a time source.
- last_node_announcement_serial: AtomicUsize,
-
/// The highest block timestamp we've seen, which is usually a good guess at the current time.
/// Assuming most miners are generating blocks with reasonable timestamps, this shouldn't be
/// very far in the past, and can only ever be up to two hours in the future.
/// Taken first everywhere where we are making changes before any other locks.
/// When acquiring this lock in read mode, rather than acquiring it directly, call
/// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
- /// PersistenceNotifier the lock contains sends out a notification when the lock is released.
+ /// Notifier the lock contains sends out a notification when the lock is released.
total_consistency_lock: RwLock<()>,
- persistence_notifier: PersistenceNotifier,
+ persistence_notifier: Notifier,
keys_manager: K,
/// notify or not based on whether relevant changes have been made, providing a closure to
/// `optionally_notify` which returns a `NotifyOption`.
struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
- persistence_notifier: &'a PersistenceNotifier,
+ persistence_notifier: &'a Notifier,
should_persist: F,
// We hold onto this result so the lock doesn't get released immediately.
_read_guard: RwLockReadGuard<'a, ()>,
}
impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused
- fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
+ fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a Notifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
PersistenceNotifierGuard::optionally_notify(lock, notifier, || -> NotifyOption { NotifyOption::DoPersist })
}
- fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
+ fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
let read_guard = lock.read().unwrap();
PersistenceNotifierGuard {
probing_cookie_secret: keys_manager.get_secure_random_bytes(),
- last_node_announcement_serial: AtomicUsize::new(0),
highest_seen_timestamp: AtomicUsize::new(0),
per_peer_state: RwLock::new(HashMap::new()),
pending_events: Mutex::new(Vec::new()),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
- persistence_notifier: PersistenceNotifier::new(),
+ persistence_notifier: Notifier::new(),
keys_manager,
})
}
- #[allow(dead_code)]
- // Messages of up to 64KB should never end up more than half full with addresses, as that would
- // be absurd. We ensure this by checking that at least 100 (our stated public contract on when
- // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
- // message...
- const HALF_MESSAGE_IS_ADDRS: u32 = ::core::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2;
- #[deny(const_err)]
- #[allow(dead_code)]
- // ...by failing to compile if the number of addresses that would be half of a message is
- // smaller than 100:
- const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 100;
-
- /// Regenerates channel_announcements and generates a signed node_announcement from the given
- /// arguments, providing them in corresponding events via
- /// [`get_and_clear_pending_msg_events`], if at least one public channel has been confirmed
- /// on-chain. This effectively re-broadcasts all channel announcements and sends our node
- /// announcement to ensure that the lightning P2P network is aware of the channels we have and
- /// our network addresses.
- ///
- /// `rgb` is a node "color" and `alias` is a printable human-readable string to describe this
- /// node to humans. They carry no in-protocol meaning.
- ///
- /// `addresses` represent the set (possibly empty) of socket addresses on which this node
- /// accepts incoming connections. These will be included in the node_announcement, publicly
- /// tying these addresses together and to this node. If you wish to preserve user privacy,
- /// addresses should likely contain only Tor Onion addresses.
- ///
- /// Panics if `addresses` is absurdly large (more than 100).
- ///
- /// [`get_and_clear_pending_msg_events`]: MessageSendEventsProvider::get_and_clear_pending_msg_events
- pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], mut addresses: Vec<NetAddress>) {
- let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-
- if addresses.len() > 100 {
- panic!("More than half the message size was taken up by public addresses!");
- }
-
- // While all existing nodes handle unsorted addresses just fine, the spec requires that
- // addresses be sorted for future compatibility.
- addresses.sort_by_key(|addr| addr.get_id());
-
- let announcement = msgs::UnsignedNodeAnnouncement {
- features: NodeFeatures::known(),
- timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel) as u32,
- node_id: self.get_our_node_id(),
- rgb, alias, addresses,
- excess_address_data: Vec::new(),
- excess_data: Vec::new(),
- };
- let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]);
- let node_announce_sig = sign(&self.secp_ctx, &msghash, &self.our_network_key);
-
- let mut channel_state_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_state_lock;
-
- let mut announced_chans = false;
- for (_, chan) in channel_state.by_id.iter() {
- if let Some(msg) = chan.get_signed_channel_announcement(self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height()) {
- channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement {
- msg,
- update_msg: match self.get_channel_update_for_broadcast(chan) {
- Ok(msg) => msg,
- Err(_) => continue,
- },
- });
- announced_chans = true;
- } else {
- // If the channel is not public or has not yet reached channel_ready, check the
- // next channel. If we don't yet have any public channels, we'll skip the broadcast
- // below as peers may not accept it without channels on chain first.
- }
- }
-
- if announced_chans {
- channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement {
- msg: msgs::NodeAnnouncement {
- signature: node_announce_sig,
- contents: announcement
- },
- });
- }
- }
-
/// Atomically updates the [`ChannelConfig`] for the given channels.
///
/// Once the updates are applied, each eligible channel (advertised with a known short channel
counterparty_node_id: &PublicKey
) {
for (htlc_src, payment_hash) in htlcs_to_fail.drain(..) {
- match htlc_src {
- HTLCSource::PreviousHopData(HTLCPreviousHopData { .. }) => {
- let (failure_code, onion_failure_data) =
- match self.channel_state.lock().unwrap().by_id.entry(channel_id) {
- hash_map::Entry::Occupied(chan_entry) => {
- self.get_htlc_inbound_temp_fail_err_and_data(0x1000|7, &chan_entry.get())
- },
- hash_map::Entry::Vacant(_) => (0x4000|10, Vec::new())
- };
- let channel_state = self.channel_state.lock().unwrap();
+ let mut channel_state = self.channel_state.lock().unwrap();
+ let (failure_code, onion_failure_data) =
+ match channel_state.by_id.entry(channel_id) {
+ hash_map::Entry::Occupied(chan_entry) => {
+ self.get_htlc_inbound_temp_fail_err_and_data(0x1000|7, &chan_entry.get())
+ },
+ hash_map::Entry::Vacant(_) => (0x4000|10, Vec::new())
+ };
- let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id.clone()), channel_id };
- self.fail_htlc_backwards_internal(channel_state, htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data }, receiver)
- },
- HTLCSource::OutboundRoute { session_priv, payment_id, path, payment_params, .. } => {
- let mut session_priv_bytes = [0; 32];
- session_priv_bytes.copy_from_slice(&session_priv[..]);
- let mut outbounds = self.pending_outbound_payments.lock().unwrap();
- if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
- if payment.get_mut().remove(&session_priv_bytes, Some(&path)) && !payment.get().is_fulfilled() {
- let retry = if let Some(payment_params_data) = payment_params {
- let path_last_hop = path.last().expect("Outbound payments must have had a valid path");
- Some(RouteParameters {
- payment_params: payment_params_data,
- final_value_msat: path_last_hop.fee_msat,
- final_cltv_expiry_delta: path_last_hop.cltv_expiry_delta,
- })
- } else { None };
- let mut pending_events = self.pending_events.lock().unwrap();
- pending_events.push(events::Event::PaymentPathFailed {
- payment_id: Some(payment_id),
- payment_hash,
- rejected_by_dest: false,
- network_update: None,
- all_paths_failed: payment.get().remaining_parts() == 0,
- path: path.clone(),
- short_channel_id: None,
- retry,
- #[cfg(test)]
- error_code: None,
- #[cfg(test)]
- error_data: None,
- });
- if payment.get().abandoned() && payment.get().remaining_parts() == 0 {
- pending_events.push(events::Event::PaymentFailed {
- payment_id,
- payment_hash: payment.get().payment_hash().expect("PendingOutboundPayments::RetriesExceeded always has a payment hash set"),
- });
- payment.remove();
- }
- }
- } else {
- log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
- }
- },
- };
+ let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id.clone()), channel_id };
+ self.fail_htlc_backwards_internal(channel_state, htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data }, receiver);
}
}
events::Event::PaymentPathFailed {
payment_id: Some(payment_id),
payment_hash: payment_hash.clone(),
- rejected_by_dest: !payment_retryable,
+ payment_failed_permanently: !payment_retryable,
network_update,
all_paths_failed,
path: path.clone(),
// channel here as we apparently can't relay through them anyway.
let scid = path.first().unwrap().short_channel_id;
retry.as_mut().map(|r| r.payment_params.previously_failed_channels.push(scid));
- events::Event::PaymentPathFailed {
- payment_id: Some(payment_id),
- payment_hash: payment_hash.clone(),
- rejected_by_dest: path.len() == 1,
- network_update: None,
- all_paths_failed,
- path: path.clone(),
- short_channel_id: Some(scid),
- retry,
+
+ if self.payment_is_probe(payment_hash, &payment_id) {
+ events::Event::ProbeFailed {
+ payment_id: payment_id,
+ payment_hash: payment_hash.clone(),
+ path: path.clone(),
+ short_channel_id: Some(scid),
+ }
+ } else {
+ events::Event::PaymentPathFailed {
+ payment_id: Some(payment_id),
+ payment_hash: payment_hash.clone(),
+ payment_failed_permanently: false,
+ network_update: None,
+ all_paths_failed,
+ path: path.clone(),
+ short_channel_id: Some(scid),
+ retry,
#[cfg(test)]
- error_code: Some(*failure_code),
+ error_code: Some(*failure_code),
#[cfg(test)]
- error_data: Some(data.clone()),
+ error_data: Some(data.clone()),
+ }
}
}
};
if were_node_one == msg_from_node_one {
return Ok(NotifyOption::SkipPersist);
} else {
+ log_debug!(self.logger, "Received channel_update for channel {}.", log_bytes!(chan_id));
try_chan_entry!(self, chan.get_mut().channel_update(&msg), channel_state, chan);
}
},
///
/// An [`EventHandler`] may safely call back to the provider in order to handle an event.
/// However, it must not call [`Writeable::write`] as doing so would result in a deadlock.
- ///
- /// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared
- /// when processed, an [`EventHandler`] must be able to handle previously seen events when
- /// restarting from an old state.
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
let mut result = NotifyOption::SkipPersist;
}
}
}
- max_time!(self.last_node_announcement_serial);
max_time!(self.highest_seen_timestamp);
let mut payment_secrets = self.pending_inbound_payments.lock().unwrap();
payment_secrets.retain(|_, inbound_payment| {
self.persistence_notifier.wait()
}
+ /// Gets a [`Future`] that completes when a persistable update is available. Note that
+ /// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and
+ /// should instead register actions to be taken later.
+ pub fn get_persistable_update_future(&self) -> Future {
+ self.persistence_notifier.get_future()
+ }
+
#[cfg(any(test, feature = "_test_utils"))]
pub fn get_persistence_condvar_value(&self) -> bool {
- let mutcond = &self.persistence_notifier.persistence_lock;
- let &(ref mtx, _) = mutcond;
- let guard = mtx.lock().unwrap();
- *guard
+ self.persistence_notifier.notify_pending()
}
/// Gets the latest best block which was connected either via the [`chain::Listen`] or
&events::MessageSendEvent::SendClosingSigned { ref node_id, .. } => node_id != counterparty_node_id,
&events::MessageSendEvent::SendShutdown { ref node_id, .. } => node_id != counterparty_node_id,
&events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != counterparty_node_id,
+ &events::MessageSendEvent::SendChannelAnnouncement { ref node_id, .. } => node_id != counterparty_node_id,
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
- &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
&events::MessageSendEvent::SendChannelUpdate { ref node_id, .. } => node_id != counterparty_node_id,
&events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id,
let channel_state = &mut *channel_state_lock;
let pending_msg_events = &mut channel_state.pending_msg_events;
channel_state.by_id.retain(|_, chan| {
- if chan.get_counterparty_node_id() == *counterparty_node_id {
+ let retain = if chan.get_counterparty_node_id() == *counterparty_node_id {
if !chan.have_received_message() {
// If we created this (outbound) channel while we were disconnected from the
// peer we probably failed to send the open_channel message, which is now
});
true
}
- } else { true }
+ } else { true };
+ if retain && chan.get_counterparty_node_id() != *counterparty_node_id {
+ if let Some(msg) = chan.get_signed_channel_announcement(self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height()) {
+ if let Ok(update_msg) = self.get_channel_update_for_broadcast(chan) {
+ pending_msg_events.push(events::MessageSendEvent::SendChannelAnnouncement {
+ node_id: *counterparty_node_id,
+ msg, update_msg,
+ });
+ }
+ }
+ }
+ retain
});
//TODO: Also re-broadcast announcement_signatures
}
let _ = self.force_close_channel_with_peer(&msg.channel_id, counterparty_node_id, Some(&msg.data), true);
}
}
-}
-
-/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
-/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`.
-struct PersistenceNotifier {
- /// Users won't access the persistence_lock directly, but rather wait on its bool using
- /// `wait_timeout` and `wait`.
- persistence_lock: (Mutex<bool>, Condvar),
-}
-impl PersistenceNotifier {
- fn new() -> Self {
- Self {
- persistence_lock: (Mutex::new(false), Condvar::new()),
- }
+ fn provided_node_features(&self) -> NodeFeatures {
+ NodeFeatures::known_channel_features()
}
- fn wait(&self) {
- loop {
- let &(ref mtx, ref cvar) = &self.persistence_lock;
- let mut guard = mtx.lock().unwrap();
- if *guard {
- *guard = false;
- return;
- }
- guard = cvar.wait(guard).unwrap();
- let result = *guard;
- if result {
- *guard = false;
- return
- }
- }
- }
-
- #[cfg(any(test, feature = "std"))]
- fn wait_timeout(&self, max_wait: Duration) -> bool {
- let current_time = Instant::now();
- loop {
- let &(ref mtx, ref cvar) = &self.persistence_lock;
- let mut guard = mtx.lock().unwrap();
- if *guard {
- *guard = false;
- return true;
- }
- guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
- // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
- // desired wait time has actually passed, and if not then restart the loop with a reduced wait
- // time. Note that this logic can be highly simplified through the use of
- // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
- // 1.42.0.
- let elapsed = current_time.elapsed();
- let result = *guard;
- if result || elapsed >= max_wait {
- *guard = false;
- return result;
- }
- match max_wait.checked_sub(elapsed) {
- None => return result,
- Some(_) => continue
- }
- }
- }
-
- // Signal to the ChannelManager persister that there are updates necessitating persisting to disk.
- fn notify(&self) {
- let &(ref persist_mtx, ref cnd) = &self.persistence_lock;
- let mut persistence_lock = persist_mtx.lock().unwrap();
- *persistence_lock = true;
- mem::drop(persistence_lock);
- cnd.notify_all();
+ fn provided_init_features(&self, _their_init_features: &PublicKey) -> InitFeatures {
+ InitFeatures::known_channel_features()
}
}
}
}
- (self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
+ // Prior to 0.0.111 we tracked node_announcement serials here, however that now happens in
+ // `PeerManager`, and thus we simply write the `highest_seen_timestamp` twice, which is
+ // likely to be identical.
+ (self.highest_seen_timestamp.load(Ordering::Acquire) as u32).write(writer)?;
(self.highest_seen_timestamp.load(Ordering::Acquire) as u32).write(writer)?;
(pending_inbound_payments.len() as u64).write(writer)?;
}
}
- let last_node_announcement_serial: u32 = Readable::read(reader)?;
+ let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111
let highest_seen_timestamp: u32 = Readable::read(reader)?;
let pending_inbound_payment_count: u64 = Readable::read(reader)?;
our_network_pubkey,
secp_ctx,
- last_node_announcement_serial: AtomicUsize::new(last_node_announcement_serial as usize),
highest_seen_timestamp: AtomicUsize::new(highest_seen_timestamp as usize),
per_peer_state: RwLock::new(per_peer_state),
pending_events: Mutex::new(pending_events_read),
pending_background_events: Mutex::new(pending_background_events_read),
total_consistency_lock: RwLock::new(()),
- persistence_notifier: PersistenceNotifier::new(),
+ persistence_notifier: Notifier::new(),
keys_manager: args.keys_manager,
logger: args.logger,
use util::test_utils;
use chain::keysinterface::KeysInterface;
- #[cfg(feature = "std")]
- #[test]
- fn test_wait_timeout() {
- use ln::channelmanager::PersistenceNotifier;
- use sync::Arc;
- use core::sync::atomic::AtomicBool;
- use std::thread;
-
- let persistence_notifier = Arc::new(PersistenceNotifier::new());
- let thread_notifier = Arc::clone(&persistence_notifier);
-
- let exit_thread = Arc::new(AtomicBool::new(false));
- let exit_thread_clone = exit_thread.clone();
- thread::spawn(move || {
- loop {
- let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock;
- let mut persistence_lock = persist_mtx.lock().unwrap();
- *persistence_lock = true;
- cnd.notify_all();
-
- if exit_thread_clone.load(Ordering::SeqCst) {
- break
- }
- }
- });
-
- // Check that we can block indefinitely until updates are available.
- let _ = persistence_notifier.wait();
-
- // Check that the PersistenceNotifier will return after the given duration if updates are
- // available.
- loop {
- if persistence_notifier.wait_timeout(Duration::from_millis(100)) {
- break
- }
- }
-
- exit_thread.store(true, Ordering::SeqCst);
-
- // Check that the PersistenceNotifier will return after the given duration even if no updates
- // are available.
- loop {
- if !persistence_notifier.wait_timeout(Duration::from_millis(100)) {
- break
- }
- }
- }
-
#[test]
fn test_notify_limits() {
// Check that a few cases which don't require the persistence of a new ChannelManager,