Make `htlc_maximum_msat` a required field.
type T = ValidatedBlockHeader;
fn validate(self, block_hash: BlockHash) -> BlockSourceResult<Self::T> {
- self.header
+ let pow_valid_block_hash = self.header
.validate_pow(&self.header.target())
.or_else(|e| Err(BlockSourceError::persistent(e)))?;
- // TODO: Use the result of validate_pow instead of recomputing the block hash once upstream.
- if self.header.block_hash() != block_hash {
+ if pow_valid_block_hash != block_hash {
return Err(BlockSourceError::persistent("invalid block hash"));
}
type T = ValidatedBlock;
fn validate(self, block_hash: BlockHash) -> BlockSourceResult<Self::T> {
- self.header
+ let pow_valid_block_hash = self.header
.validate_pow(&self.header.target())
.or_else(|e| Err(BlockSourceError::persistent(e)))?;
- // TODO: Use the result of validate_pow instead of recomputing the block hash once upstream.
- if self.block_hash() != block_hash {
+ if pow_valid_block_hash != block_hash {
return Err(BlockSourceError::persistent("invalid block hash"));
}
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
use lightning::util::logger::Logger;
+use std::ops::Deref;
use std::task;
use std::net::SocketAddr;
use std::net::TcpStream as StdTcpStream;
id: u64,
}
impl Connection {
- async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+ async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + Send + Sync,
+ RMH::Target: RoutingMessageHandler + Send + Sync,
+ L::Target: Logger + Send + Sync,
+ UMH::Target: CustomMessageHandler + Send + Sync,
+ {
loop {
if event_receiver.recv().await.is_none() {
return;
}
}
- async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+ async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
+ RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
+ L::Target: Logger + 'static + Send + Sync,
+ UMH::Target: CustomMessageHandler + 'static + Send + Sync,
+ {
// Create a waker to wake up poll_event_process, above
let (event_waker, event_receiver) = mpsc::channel(1);
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
-pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + Send + Sync,
+ RMH::Target: RoutingMessageHandler + Send + Sync,
+ L::Target: Logger + Send + Sync,
+ UMH::Target: CustomMessageHandler + Send + Sync,
+{
let remote_addr = get_addr_from_stream(&stream);
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
-pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + Send + Sync,
+ RMH::Target: RoutingMessageHandler + Send + Sync,
+ L::Target: Logger + Send + Sync,
+ UMH::Target: CustomMessageHandler + Send + Sync,
+{
let remote_addr = get_addr_from_stream(&stream);
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
/// disconnected and associated handling futures are freed, though, because all processing in said
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
/// make progress.
-pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + Send + Sync,
+ RMH::Target: RoutingMessageHandler + Send + Sync,
+ L::Target: Logger + Send + Sync,
+ UMH::Target: CustomMessageHandler + Send + Sync,
+{
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
Some(setup_outbound(peer_manager, their_node_id, stream))
} else { None }
pub const MIN_RELAY_FEE_SAT_PER_1000_WEIGHT: u64 = 4000;
/// Minimum feerate that takes a sane approach to bitcoind weight-to-vbytes rounding.
/// See the following Core Lightning commit for an explanation:
-/// https://github.com/ElementsProject/lightning/commit/2e687b9b352c9092b5e8bd4a688916ac50b44af0
+/// <https://github.com/ElementsProject/lightning/commit/2e687b9b352c9092b5e8bd4a688916ac50b44af0>
pub const FEERATE_FLOOR_SATS_PER_KW: u32 = 253;
/// Wraps a `Deref` to a `FeeEstimator` so that any fee estimations provided by it
}
impl<Signer: Sign> ChannelMonitor<Signer> {
+ /// For lockorder enforcement purposes, we need to have a single site which constructs the
+ /// `inner` mutex, otherwise cases where we lock two monitors at the same time (eg in our
+ /// PartialEq implementation) we may decide a lockorder violation has occurred.
+ fn from_impl(imp: ChannelMonitorImpl<Signer>) -> Self {
+ ChannelMonitor { inner: Mutex::new(imp) }
+ }
+
pub(crate) fn new(secp_ctx: Secp256k1<secp256k1::All>, keys: Signer, shutdown_script: Option<Script>,
on_counterparty_tx_csv: u16, destination_script: &Script, funding_info: (OutPoint, Script),
channel_parameters: &ChannelTransactionParameters,
let mut outputs_to_watch = HashMap::new();
outputs_to_watch.insert(funding_info.0.txid, vec![(funding_info.0.index as u32, funding_info.1.clone())]);
- ChannelMonitor {
- inner: Mutex::new(ChannelMonitorImpl {
- latest_update_id: 0,
- commitment_transaction_number_obscure_factor,
+ Self::from_impl(ChannelMonitorImpl {
+ latest_update_id: 0,
+ commitment_transaction_number_obscure_factor,
- destination_script: destination_script.clone(),
- broadcasted_holder_revokable_script: None,
- counterparty_payment_script,
- shutdown_script,
+ destination_script: destination_script.clone(),
+ broadcasted_holder_revokable_script: None,
+ counterparty_payment_script,
+ shutdown_script,
- channel_keys_id,
- holder_revocation_basepoint,
- funding_info,
- current_counterparty_commitment_txid: None,
- prev_counterparty_commitment_txid: None,
+ channel_keys_id,
+ holder_revocation_basepoint,
+ funding_info,
+ current_counterparty_commitment_txid: None,
+ prev_counterparty_commitment_txid: None,
- counterparty_commitment_params,
- funding_redeemscript,
- channel_value_satoshis,
- their_cur_per_commitment_points: None,
+ counterparty_commitment_params,
+ funding_redeemscript,
+ channel_value_satoshis,
+ their_cur_per_commitment_points: None,
- on_holder_tx_csv: counterparty_channel_parameters.selected_contest_delay,
+ on_holder_tx_csv: counterparty_channel_parameters.selected_contest_delay,
- commitment_secrets: CounterpartyCommitmentSecrets::new(),
- counterparty_claimable_outpoints: HashMap::new(),
- counterparty_commitment_txn_on_chain: HashMap::new(),
- counterparty_hash_commitment_number: HashMap::new(),
+ commitment_secrets: CounterpartyCommitmentSecrets::new(),
+ counterparty_claimable_outpoints: HashMap::new(),
+ counterparty_commitment_txn_on_chain: HashMap::new(),
+ counterparty_hash_commitment_number: HashMap::new(),
- prev_holder_signed_commitment_tx: None,
- current_holder_commitment_tx: holder_commitment_tx,
- current_counterparty_commitment_number: 1 << 48,
- current_holder_commitment_number,
+ prev_holder_signed_commitment_tx: None,
+ current_holder_commitment_tx: holder_commitment_tx,
+ current_counterparty_commitment_number: 1 << 48,
+ current_holder_commitment_number,
- payment_preimages: HashMap::new(),
- pending_monitor_events: Vec::new(),
- pending_events: Vec::new(),
+ payment_preimages: HashMap::new(),
+ pending_monitor_events: Vec::new(),
+ pending_events: Vec::new(),
- onchain_events_awaiting_threshold_conf: Vec::new(),
- outputs_to_watch,
+ onchain_events_awaiting_threshold_conf: Vec::new(),
+ outputs_to_watch,
- onchain_tx_handler,
+ onchain_tx_handler,
- lockdown_from_offchain: false,
- holder_tx_signed: false,
- funding_spend_seen: false,
- funding_spend_confirmed: None,
- htlcs_resolved_on_chain: Vec::new(),
+ lockdown_from_offchain: false,
+ holder_tx_signed: false,
+ funding_spend_seen: false,
+ funding_spend_confirmed: None,
+ htlcs_resolved_on_chain: Vec::new(),
- best_block,
- counterparty_node_id: Some(counterparty_node_id),
+ best_block,
+ counterparty_node_id: Some(counterparty_node_id),
- secp_ctx,
- }),
- }
+ secp_ctx,
+ })
}
#[cfg(test)]
let mut secp_ctx = Secp256k1::new();
secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes());
- Ok((best_block.block_hash(), ChannelMonitor {
- inner: Mutex::new(ChannelMonitorImpl {
- latest_update_id,
- commitment_transaction_number_obscure_factor,
+ Ok((best_block.block_hash(), ChannelMonitor::from_impl(ChannelMonitorImpl {
+ latest_update_id,
+ commitment_transaction_number_obscure_factor,
- destination_script,
- broadcasted_holder_revokable_script,
- counterparty_payment_script,
- shutdown_script,
+ destination_script,
+ broadcasted_holder_revokable_script,
+ counterparty_payment_script,
+ shutdown_script,
- channel_keys_id,
- holder_revocation_basepoint,
- funding_info,
- current_counterparty_commitment_txid,
- prev_counterparty_commitment_txid,
+ channel_keys_id,
+ holder_revocation_basepoint,
+ funding_info,
+ current_counterparty_commitment_txid,
+ prev_counterparty_commitment_txid,
- counterparty_commitment_params,
- funding_redeemscript,
- channel_value_satoshis,
- their_cur_per_commitment_points,
+ counterparty_commitment_params,
+ funding_redeemscript,
+ channel_value_satoshis,
+ their_cur_per_commitment_points,
- on_holder_tx_csv,
+ on_holder_tx_csv,
- commitment_secrets,
- counterparty_claimable_outpoints,
- counterparty_commitment_txn_on_chain,
- counterparty_hash_commitment_number,
+ commitment_secrets,
+ counterparty_claimable_outpoints,
+ counterparty_commitment_txn_on_chain,
+ counterparty_hash_commitment_number,
- prev_holder_signed_commitment_tx,
- current_holder_commitment_tx,
- current_counterparty_commitment_number,
- current_holder_commitment_number,
+ prev_holder_signed_commitment_tx,
+ current_holder_commitment_tx,
+ current_counterparty_commitment_number,
+ current_holder_commitment_number,
- payment_preimages,
- pending_monitor_events: pending_monitor_events.unwrap(),
- pending_events,
+ payment_preimages,
+ pending_monitor_events: pending_monitor_events.unwrap(),
+ pending_events,
- onchain_events_awaiting_threshold_conf,
- outputs_to_watch,
+ onchain_events_awaiting_threshold_conf,
+ outputs_to_watch,
- onchain_tx_handler,
+ onchain_tx_handler,
- lockdown_from_offchain,
- holder_tx_signed,
- funding_spend_seen: funding_spend_seen.unwrap(),
- funding_spend_confirmed,
- htlcs_resolved_on_chain: htlcs_resolved_on_chain.unwrap(),
+ lockdown_from_offchain,
+ holder_tx_signed,
+ funding_spend_seen: funding_spend_seen.unwrap(),
+ funding_spend_confirmed,
+ htlcs_resolved_on_chain: htlcs_resolved_on_chain.unwrap(),
- best_block,
- counterparty_node_id,
+ best_block,
+ counterparty_node_id,
- secp_ctx,
- }),
- }))
+ secp_ctx,
+ })))
}
}
use core::ops::{Deref, DerefMut};
use core::time::Duration;
-use std::collections::HashSet;
use std::cell::RefCell;
use std::sync::atomic::{AtomicUsize, Ordering};
-
use std::sync::Mutex as StdMutex;
use std::sync::MutexGuard as StdMutexGuard;
use std::sync::RwLock as StdRwLock;
use std::sync::RwLockWriteGuard as StdRwLockWriteGuard;
use std::sync::Condvar as StdCondvar;
+use prelude::HashMap;
+
#[cfg(feature = "backtrace")]
-use backtrace::Backtrace;
+use {prelude::hash_map, backtrace::Backtrace, std::sync::Once};
+
+#[cfg(not(feature = "backtrace"))]
+struct Backtrace{}
+#[cfg(not(feature = "backtrace"))]
+impl Backtrace { fn new() -> Backtrace { Backtrace {} } }
pub type LockResult<Guard> = Result<Guard, ()>;
thread_local! {
/// We track the set of locks currently held by a reference to their `LockMetadata`
- static LOCKS_HELD: RefCell<HashSet<Arc<LockMetadata>>> = RefCell::new(HashSet::new());
+ static LOCKS_HELD: RefCell<HashMap<u64, Arc<LockMetadata>>> = RefCell::new(HashMap::new());
}
static LOCK_IDX: AtomicUsize = AtomicUsize::new(0);
+#[cfg(feature = "backtrace")]
+static mut LOCKS: Option<StdMutex<HashMap<String, Arc<LockMetadata>>>> = None;
+#[cfg(feature = "backtrace")]
+static LOCKS_INIT: Once = Once::new();
+
/// Metadata about a single lock, by id, the set of things locked-before it, and the backtrace of
/// when the Mutex itself was constructed.
struct LockMetadata {
lock_idx: u64,
- locked_before: StdMutex<HashSet<Arc<LockMetadata>>>,
- #[cfg(feature = "backtrace")]
- lock_construction_bt: Backtrace,
+ locked_before: StdMutex<HashMap<u64, LockDep>>,
+ _lock_construction_bt: Backtrace,
}
-impl PartialEq for LockMetadata {
- fn eq(&self, o: &LockMetadata) -> bool { self.lock_idx == o.lock_idx }
+
+struct LockDep {
+ lock: Arc<LockMetadata>,
+ lockdep_trace: Backtrace,
}
-impl Eq for LockMetadata {}
-impl std::hash::Hash for LockMetadata {
- fn hash<H: std::hash::Hasher>(&self, hasher: &mut H) { hasher.write_u64(self.lock_idx); }
+
+#[cfg(feature = "backtrace")]
+fn get_construction_location(backtrace: &Backtrace) -> String {
+ // Find the first frame that is after `debug_sync` (or that is in our tests) and use
+ // that as the mutex construction site. Note that the first few frames may be in
+ // the `backtrace` crate, so we have to ignore those.
+ let sync_mutex_constr_regex = regex::Regex::new(r"lightning.*debug_sync.*new").unwrap();
+ let mut found_debug_sync = false;
+ for frame in backtrace.frames() {
+ for symbol in frame.symbols() {
+ let symbol_name = symbol.name().unwrap().as_str().unwrap();
+ if !sync_mutex_constr_regex.is_match(symbol_name) {
+ if found_debug_sync {
+ if let Some(col) = symbol.colno() {
+ return format!("{}:{}:{}", symbol.filename().unwrap().display(), symbol.lineno().unwrap(), col);
+ } else {
+ // Windows debug symbols don't support column numbers, so fall back to
+ // line numbers only if no `colno` is available
+ return format!("{}:{}", symbol.filename().unwrap().display(), symbol.lineno().unwrap());
+ }
+ }
+ } else { found_debug_sync = true; }
+ }
+ }
+ panic!("Couldn't find mutex construction callsite");
}
impl LockMetadata {
- fn new() -> LockMetadata {
- LockMetadata {
- locked_before: StdMutex::new(HashSet::new()),
- lock_idx: LOCK_IDX.fetch_add(1, Ordering::Relaxed) as u64,
- #[cfg(feature = "backtrace")]
- lock_construction_bt: Backtrace::new(),
+ fn new() -> Arc<LockMetadata> {
+ let backtrace = Backtrace::new();
+ let lock_idx = LOCK_IDX.fetch_add(1, Ordering::Relaxed) as u64;
+
+ let res = Arc::new(LockMetadata {
+ locked_before: StdMutex::new(HashMap::new()),
+ lock_idx,
+ _lock_construction_bt: backtrace,
+ });
+
+ #[cfg(feature = "backtrace")]
+ {
+ let lock_constr_location = get_construction_location(&res._lock_construction_bt);
+ LOCKS_INIT.call_once(|| { unsafe { LOCKS = Some(StdMutex::new(HashMap::new())); } });
+ let mut locks = unsafe { LOCKS.as_ref() }.unwrap().lock().unwrap();
+ match locks.entry(lock_constr_location) {
+ hash_map::Entry::Occupied(e) => return Arc::clone(e.get()),
+ hash_map::Entry::Vacant(e) => { e.insert(Arc::clone(&res)); },
+ }
}
+ res
}
// Returns whether we were a recursive lock (only relevant for read)
// For each lock which is currently locked, check that no lock's locked-before
// set includes the lock we're about to lock, which would imply a lockorder
// inversion.
- for locked in held.borrow().iter() {
- if read && *locked == *this {
+ for (locked_idx, _locked) in held.borrow().iter() {
+ if read && *locked_idx == this.lock_idx {
// Recursive read locks are explicitly allowed
return;
}
}
- for locked in held.borrow().iter() {
- if !read && *locked == *this {
- panic!("Tried to lock a lock while it was held!");
+ for (locked_idx, locked) in held.borrow().iter() {
+ if !read && *locked_idx == this.lock_idx {
+ // With `feature = "backtrace"` set, we may be looking at different instances
+ // of the same lock.
+ debug_assert!(cfg!(feature = "backtrace"), "Tried to acquire a lock while it was held!");
}
- for locked_dep in locked.locked_before.lock().unwrap().iter() {
- if *locked_dep == *this {
+ for (locked_dep_idx, locked_dep) in locked.locked_before.lock().unwrap().iter() {
+ if *locked_dep_idx == this.lock_idx && *locked_dep_idx != locked.lock_idx {
#[cfg(feature = "backtrace")]
- panic!("Tried to violate existing lockorder.\nMutex that should be locked after the current lock was created at the following backtrace.\nNote that to get a backtrace for the lockorder violation, you should set RUST_BACKTRACE=1\n{:?}", locked.lock_construction_bt);
+ panic!("Tried to violate existing lockorder.\nMutex that should be locked after the current lock was created at the following backtrace.\nNote that to get a backtrace for the lockorder violation, you should set RUST_BACKTRACE=1\nLock being taken constructed at: {} ({}):\n{:?}\nLock constructed at: {} ({})\n{:?}\n\nLock dep created at:\n{:?}\n\n",
+ get_construction_location(&this._lock_construction_bt), this.lock_idx, this._lock_construction_bt,
+ get_construction_location(&locked._lock_construction_bt), locked.lock_idx, locked._lock_construction_bt,
+ locked_dep.lockdep_trace);
#[cfg(not(feature = "backtrace"))]
panic!("Tried to violate existing lockorder. Build with the backtrace feature for more info.");
}
}
// Insert any already-held locks in our locked-before set.
- this.locked_before.lock().unwrap().insert(Arc::clone(locked));
+ let mut locked_before = this.locked_before.lock().unwrap();
+ if !locked_before.contains_key(&locked.lock_idx) {
+ let lockdep = LockDep { lock: Arc::clone(locked), lockdep_trace: Backtrace::new() };
+ locked_before.insert(lockdep.lock.lock_idx, lockdep);
+ }
}
- held.borrow_mut().insert(Arc::clone(this));
+ held.borrow_mut().insert(this.lock_idx, Arc::clone(this));
inserted = true;
});
inserted
// Since a try-lock will simply fail if the lock is held already, we do not
// consider try-locks to ever generate lockorder inversions. However, if a try-lock
// succeeds, we do consider it to have created lockorder dependencies.
- for locked in held.borrow().iter() {
- this.locked_before.lock().unwrap().insert(Arc::clone(locked));
+ let mut locked_before = this.locked_before.lock().unwrap();
+ for (locked_idx, locked) in held.borrow().iter() {
+ if !locked_before.contains_key(locked_idx) {
+ let lockdep = LockDep { lock: Arc::clone(locked), lockdep_trace: Backtrace::new() };
+ locked_before.insert(*locked_idx, lockdep);
+ }
}
- held.borrow_mut().insert(Arc::clone(this));
+ held.borrow_mut().insert(this.lock_idx, Arc::clone(this));
});
}
}
impl<T: Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
LOCKS_HELD.with(|held| {
- held.borrow_mut().remove(&self.mutex.deps);
+ held.borrow_mut().remove(&self.mutex.deps.lock_idx);
});
}
}
impl<T> Mutex<T> {
pub fn new(inner: T) -> Mutex<T> {
- Mutex { inner: StdMutex::new(inner), deps: Arc::new(LockMetadata::new()) }
+ Mutex { inner: StdMutex::new(inner), deps: LockMetadata::new() }
}
pub fn lock<'a>(&'a self) -> LockResult<MutexGuard<'a, T>> {
return;
}
LOCKS_HELD.with(|held| {
- held.borrow_mut().remove(&self.lock.deps);
+ held.borrow_mut().remove(&self.lock.deps.lock_idx);
});
}
}
impl<T: Sized> Drop for RwLockWriteGuard<'_, T> {
fn drop(&mut self) {
LOCKS_HELD.with(|held| {
- held.borrow_mut().remove(&self.lock.deps);
+ held.borrow_mut().remove(&self.lock.deps.lock_idx);
});
}
}
impl<T> RwLock<T> {
pub fn new(inner: T) -> RwLock<T> {
- RwLock { inner: StdRwLock::new(inner), deps: Arc::new(LockMetadata::new()) }
+ RwLock { inner: StdRwLock::new(inner), deps: LockMetadata::new() }
}
pub fn read<'a>(&'a self) -> LockResult<RwLockReadGuard<'a, T>> {
}
}
-#[test]
-#[should_panic]
-fn recursive_lock_fail() {
- let mutex = Mutex::new(());
- let _a = mutex.lock().unwrap();
- let _b = mutex.lock().unwrap();
-}
-
-#[test]
-fn recursive_read() {
- let lock = RwLock::new(());
- let _a = lock.read().unwrap();
- let _b = lock.read().unwrap();
-}
+pub type FairRwLock<T> = RwLock<T>;
-#[test]
-#[should_panic]
-fn lockorder_fail() {
- let a = Mutex::new(());
- let b = Mutex::new(());
- {
- let _a = a.lock().unwrap();
- let _b = b.lock().unwrap();
- }
- {
- let _b = b.lock().unwrap();
- let _a = a.lock().unwrap();
+mod tests {
+ use super::{RwLock, Mutex};
+
+ #[test]
+ #[should_panic]
+ #[cfg(not(feature = "backtrace"))]
+ fn recursive_lock_fail() {
+ let mutex = Mutex::new(());
+ let _a = mutex.lock().unwrap();
+ let _b = mutex.lock().unwrap();
+ }
+
+ #[test]
+ fn recursive_read() {
+ let lock = RwLock::new(());
+ let _a = lock.read().unwrap();
+ let _b = lock.read().unwrap();
+ }
+
+ #[test]
+ #[should_panic]
+ fn lockorder_fail() {
+ let a = Mutex::new(());
+ let b = Mutex::new(());
+ {
+ let _a = a.lock().unwrap();
+ let _b = b.lock().unwrap();
+ }
+ {
+ let _b = b.lock().unwrap();
+ let _a = a.lock().unwrap();
+ }
}
-}
-#[test]
-#[should_panic]
-fn write_lockorder_fail() {
- let a = RwLock::new(());
- let b = RwLock::new(());
- {
- let _a = a.write().unwrap();
- let _b = b.write().unwrap();
- }
- {
- let _b = b.write().unwrap();
- let _a = a.write().unwrap();
+ #[test]
+ #[should_panic]
+ fn write_lockorder_fail() {
+ let a = RwLock::new(());
+ let b = RwLock::new(());
+ {
+ let _a = a.write().unwrap();
+ let _b = b.write().unwrap();
+ }
+ {
+ let _b = b.write().unwrap();
+ let _a = a.write().unwrap();
+ }
}
-}
-#[test]
-#[should_panic]
-fn read_lockorder_fail() {
- let a = RwLock::new(());
- let b = RwLock::new(());
- {
- let _a = a.read().unwrap();
- let _b = b.read().unwrap();
- }
- {
- let _b = b.read().unwrap();
- let _a = a.read().unwrap();
+ #[test]
+ #[should_panic]
+ fn read_lockorder_fail() {
+ let a = RwLock::new(());
+ let b = RwLock::new(());
+ {
+ let _a = a.read().unwrap();
+ let _b = b.read().unwrap();
+ }
+ {
+ let _b = b.read().unwrap();
+ let _a = a.read().unwrap();
+ }
}
-}
-#[test]
-fn read_recurisve_no_lockorder() {
- // Like the above, but note that no lockorder is implied when we recursively read-lock a
- // RwLock, causing this to pass just fine.
- let a = RwLock::new(());
- let b = RwLock::new(());
- let _outer = a.read().unwrap();
- {
- let _a = a.read().unwrap();
- let _b = b.read().unwrap();
- }
- {
- let _b = b.read().unwrap();
- let _a = a.read().unwrap();
+ #[test]
+ fn read_recursive_no_lockorder() {
+ // Like the above, but note that no lockorder is implied when we recursively read-lock a
+ // RwLock, causing this to pass just fine.
+ let a = RwLock::new(());
+ let b = RwLock::new(());
+ let _outer = a.read().unwrap();
+ {
+ let _a = a.read().unwrap();
+ let _b = b.read().unwrap();
+ }
+ {
+ let _b = b.read().unwrap();
+ let _a = a.read().unwrap();
+ }
}
-}
-#[test]
-#[should_panic]
-fn read_write_lockorder_fail() {
- let a = RwLock::new(());
- let b = RwLock::new(());
- {
- let _a = a.write().unwrap();
- let _b = b.read().unwrap();
- }
- {
- let _b = b.read().unwrap();
- let _a = a.write().unwrap();
+ #[test]
+ #[should_panic]
+ fn read_write_lockorder_fail() {
+ let a = RwLock::new(());
+ let b = RwLock::new(());
+ {
+ let _a = a.write().unwrap();
+ let _b = b.read().unwrap();
+ }
+ {
+ let _b = b.read().unwrap();
+ let _a = a.write().unwrap();
+ }
}
}
-
-pub type FairRwLock<T> = RwLock<T>;
}
}
+ let broadcaster = test_utils::TestBroadcaster {
+ txn_broadcasted: Mutex::new(self.tx_broadcaster.txn_broadcasted.lock().unwrap().clone()),
+ blocks: Arc::new(Mutex::new(self.tx_broadcaster.blocks.lock().unwrap().clone())),
+ };
+
// Before using all the new monitors to check the watch outpoints, use the full set of
// them to ensure we can write and reload our ChannelManager.
{
keys_manager: self.keys_manager,
fee_estimator: &test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) },
chain_monitor: self.chain_monitor,
- tx_broadcaster: &test_utils::TestBroadcaster {
- txn_broadcasted: Mutex::new(self.tx_broadcaster.txn_broadcasted.lock().unwrap().clone()),
- blocks: Arc::new(Mutex::new(self.tx_broadcaster.blocks.lock().unwrap().clone())),
- },
+ tx_broadcaster: &broadcaster,
logger: &self.logger,
channel_monitors,
}).unwrap();
}
let persister = test_utils::TestPersister::new();
- let broadcaster = test_utils::TestBroadcaster {
- txn_broadcasted: Mutex::new(self.tx_broadcaster.txn_broadcasted.lock().unwrap().clone()),
- blocks: Arc::new(Mutex::new(self.tx_broadcaster.blocks.lock().unwrap().clone())),
- };
let chain_source = test_utils::TestChainSource::new(Network::Testnet);
let chain_monitor = test_utils::TestChainMonitor::new(Some(&chain_source), &broadcaster, &self.logger, &feeest, &persister, &self.keys_manager);
for deserialized_monitor in deserialized_monitors.drain(..) {
check_added_monitors!(nodes[0], 1);
check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
- let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
+ let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(node_txn.len(), 3);
assert_eq!(node_txn[0], node_txn[1]);
check_added_monitors!(nodes[0], 1);
check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
- let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
+ let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(node_txn.len(), 1);
check_spends!(node_txn[0], chan.3);
assert_eq!(node_txn[0].output.len(), 2); // We can't force trimming of to_remote output as channel_reserve_satoshis block us to do so at channel opening
check_closed_event!(nodes[0], 1, ClosureReason::CommitmentTxConfirmed);
connect_blocks(&nodes[0], TEST_FINAL_CLTV - 1); // Confirm blocks until the HTLC expires
- let revoked_htlc_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
+ let revoked_htlc_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(revoked_htlc_txn.len(), 2);
check_spends!(revoked_htlc_txn[0], chan_1.3);
assert_eq!(revoked_htlc_txn[1].input.len(), 1);
check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
connect_blocks(&nodes[1], 49); // Confirm blocks until the HTLC expires (note CLTV was explicitly 50 above)
- let revoked_htlc_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
+ let revoked_htlc_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(revoked_htlc_txn.len(), 3);
check_spends!(revoked_htlc_txn[1], chan.3);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;
- let mut guard = nodes[0].node.channel_state.lock().unwrap();
- let keys = guard.by_id.get_mut(&channel_id).unwrap().get_signer();
+ let per_commitment_secret;
+ let next_per_commitment_point;
+ {
+ let mut guard = nodes[0].node.channel_state.lock().unwrap();
+ let keys = guard.by_id.get_mut(&channel_id).unwrap().get_signer();
- const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1;
+ const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1;
- // Make signer believe we got a counterparty signature, so that it allows the revocation
- keys.get_enforcement_state().last_holder_commitment -= 1;
- let per_commitment_secret = keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER);
+ // Make signer believe we got a counterparty signature, so that it allows the revocation
+ keys.get_enforcement_state().last_holder_commitment -= 1;
+ per_commitment_secret = keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER);
- // Must revoke without gaps
- keys.get_enforcement_state().last_holder_commitment -= 1;
- keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER - 1);
+ // Must revoke without gaps
+ keys.get_enforcement_state().last_holder_commitment -= 1;
+ keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER - 1);
- keys.get_enforcement_state().last_holder_commitment -= 1;
- let next_per_commitment_point = PublicKey::from_secret_key(&Secp256k1::new(),
- &SecretKey::from_slice(&keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER - 2)).unwrap());
+ keys.get_enforcement_state().last_holder_commitment -= 1;
+ next_per_commitment_point = PublicKey::from_secret_key(&Secp256k1::new(),
+ &SecretKey::from_slice(&keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER - 2)).unwrap());
+ }
nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(),
&msgs::RevokeAndACK { channel_id, per_commitment_secret, next_per_commitment_point });
// `MessageSendEvent::SendAcceptChannel` event. The message is passed to `nodes[0]`
// `handle_accept_channel`, which is required in order for `create_funding_transaction` to
// succeed when `nodes[0]` is passed to it.
- {
+ let accept_chan_msg = {
let mut lock;
let channel = get_channel_ref!(&nodes[1], lock, temp_channel_id);
- let accept_chan_msg = channel.get_accept_channel_message();
- nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), InitFeatures::known(), &accept_chan_msg);
- }
+ channel.get_accept_channel_message()
+ };
+ nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), InitFeatures::known(), &accept_chan_msg);
let (temporary_channel_id, tx, _) = create_funding_transaction(&nodes[0], &nodes[1].node.get_our_node_id(), 100000, 42);
peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
peer_a.process_events();
- assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+
+ let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
peer_b.process_events();
- assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+ let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+ assert_eq!(peer_a.read_event(&mut fd_a, &b_data).unwrap(), false);
+
peer_a.process_events();
- assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+ let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
(fd_a.clone(), fd_b.clone())
}
assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
peers[0].process_events();
- assert_eq!(peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+ let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
peers[1].process_events();
// ...but if we get a second timer tick, we should disconnect the peer
peers[0].timer_tick_occurred();
assert_eq!(peers[0].peers.read().unwrap().len(), 0);
- assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err());
+ let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+ assert!(peers[0].read_event(&mut fd_a, &b_data).is_err());
}
#[test]
check_added_monitors!(nodes[2], 1);
check_closed_broadcast!(nodes[2], true); // We should get a BroadcastChannelUpdate (and *only* a BroadcstChannelUpdate)
check_closed_event!(nodes[2], 1, ClosureReason::CommitmentTxConfirmed);
- let node_2_commitment_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap();
+ let node_2_commitment_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(node_2_commitment_txn.len(), 3); // ChannelMonitor: 1 offered HTLC-Claim, ChannelManger: 1 local commitment tx, 1 Received HTLC-Claim
assert_eq!(node_2_commitment_txn[1].output.len(), 2); // to-remote and Received HTLC (to-self is dust)
check_spends!(node_2_commitment_txn[1], chan_2.3);