//! call into the provided message handlers (probably a ChannelManager and NetGraphmsgHandler) with messages
//! they should handle, and encoding/sending response messages.
-use bitcoin::secp256k1::key::{SecretKey,PublicKey};
+use bitcoin::secp256k1::{SecretKey,PublicKey};
use ln::features::InitFeatures;
use ln::msgs;
use prelude::*;
use io;
use alloc::collections::LinkedList;
-use sync::{Arc, Mutex, MutexGuard, RwLock};
+use sync::{Arc, Mutex, MutexGuard, FairRwLock};
+use core::sync::atomic::{AtomicBool, Ordering};
use core::{cmp, hash, fmt, mem};
use core::ops::Deref;
use core::convert::Infallible;
}
}
-struct PeerHolder<Descriptor: SocketDescriptor> {
- /// Peer is under its own mutex for sending and receiving bytes, but note that we do *not* hold
- /// this mutex while we're processing a message. This is fine as [`PeerManager::read_event`]
- /// requires that there be no parallel calls for a given peer, so mutual exclusion of messages
- /// handed to the `MessageHandler`s for a given peer is already guaranteed.
- peers: HashMap<Descriptor, Mutex<Peer>>,
-}
-
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
L::Target: Logger,
CMH::Target: CustomMessageHandler {
message_handler: MessageHandler<CM, RM>,
- peers: RwLock<PeerHolder<Descriptor>>,
+ /// Connection state for each connected peer - we have an outer read-write lock which is taken
+ /// as read while we're doing processing for a peer and taken write when a peer is being added
+ /// or removed.
+ ///
+ /// The inner Peer lock is held for sending and receiving bytes, but note that we do *not* hold
+ /// it while we're processing a message. This is fine as [`PeerManager::read_event`] requires
+ /// that there be no parallel calls for a given peer, so mutual exclusion of messages handed to
+ /// the `MessageHandler`s for a given peer is already guaranteed.
+ peers: FairRwLock<HashMap<Descriptor, Mutex<Peer>>>,
/// Only add to this set when noise completes.
/// Locked *after* peers. When an item is removed, it must be removed with the `peers` write
/// lock held. Entries may be added with only the `peers` read lock held (though the
/// `peers` write lock to do so, so instead we block on this empty mutex when entering
/// `process_events`.
event_processing_lock: Mutex<()>,
+ /// Because event processing is global and always does all available work before returning,
+ /// there is no reason for us to have many event processors waiting on the lock at once.
+ /// Instead, we limit the total blocked event processors to always exactly one by setting this
+ /// when an event process call is waiting.
+ blocked_event_processors: AtomicBool,
our_node_secret: SecretKey,
ephemeral_key_midstate: Sha256Engine,
custom_message_handler: CMH,
PeerManager {
message_handler,
- peers: RwLock::new(PeerHolder {
- peers: HashMap::new(),
- }),
+ peers: FairRwLock::new(HashMap::new()),
node_id_to_descriptor: Mutex::new(HashMap::new()),
event_processing_lock: Mutex::new(()),
+ blocked_event_processors: AtomicBool::new(false),
our_node_secret,
ephemeral_key_midstate,
peer_counter: AtomicCounter::new(),
/// completed and we are sure the remote peer has the private key for the given node_id.
pub fn get_peer_node_ids(&self) -> Vec<PublicKey> {
let peers = self.peers.read().unwrap();
- peers.peers.values().filter_map(|peer_mutex| {
+ peers.values().filter_map(|peer_mutex| {
let p = peer_mutex.lock().unwrap();
if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() {
return None;
let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes
let mut peers = self.peers.write().unwrap();
- if peers.peers.insert(descriptor, Mutex::new(Peer {
+ if peers.insert(descriptor, Mutex::new(Peer {
channel_encryptor: peer_encryptor,
their_node_id: None,
their_features: None,
let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes
let mut peers = self.peers.write().unwrap();
- if peers.peers.insert(descriptor, Mutex::new(Peer {
+ if peers.insert(descriptor, Mutex::new(Peer {
channel_encryptor: peer_encryptor,
their_node_id: None,
their_features: None,
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
let peers = self.peers.read().unwrap();
- match peers.peers.get(descriptor) {
+ match peers.get(descriptor) {
None => {
// This is most likely a simple race condition where the user found that the socket
// was writeable, then we told the user to `disconnect_socket()`, then they called
let peers = self.peers.read().unwrap();
let mut msgs_to_forward = Vec::new();
let mut peer_node_id = None;
- match peers.peers.get(peer_descriptor) {
+ match peers.get(peer_descriptor) {
None => {
// This is most likely a simple race condition where the user read some bytes
// from the socket, then we told the user to `disconnect_socket()`, then they
if peer.pending_read_is_header {
let msg_len = try_potential_handleerror!(peer,
peer.channel_encryptor.decrypt_length_header(&peer.pending_read_buffer[..]));
- peer.pending_read_buffer = Vec::with_capacity(msg_len as usize + 16);
+ if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
peer.pending_read_buffer.resize(msg_len as usize + 16, 0);
if msg_len < 2 { // Need at least the message type tag
return Err(PeerHandleError{ no_connection_possible: false });
assert!(msg_data.len() >= 2);
// Reset read buffer
- peer.pending_read_buffer = [0; 18].to_vec();
+ if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
+ peer.pending_read_buffer.resize(18, 0);
peer.pending_read_is_header = true;
let mut reader = io::Cursor::new(&msg_data[..]);
Ok(should_forward)
}
- fn forward_broadcast_msg(&self, peers: &PeerHolder<Descriptor>, msg: &wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
+ fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
match msg {
wire::Message::ChannelAnnouncement(ref msg) => {
log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
let encoded_msg = encode_msg!(msg);
- for (_, peer_mutex) in peers.peers.iter() {
+ for (_, peer_mutex) in peers.iter() {
let mut peer = peer_mutex.lock().unwrap();
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg);
let encoded_msg = encode_msg!(msg);
- for (_, peer_mutex) in peers.peers.iter() {
+ for (_, peer_mutex) in peers.iter() {
let mut peer = peer_mutex.lock().unwrap();
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
!peer.should_forward_node_announcement(msg.contents.node_id) {
log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg);
let encoded_msg = encode_msg!(msg);
- for (_, peer_mutex) in peers.peers.iter() {
+ for (_, peer_mutex) in peers.iter() {
let mut peer = peer_mutex.lock().unwrap();
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
/// or one of the other clients provided in our language bindings.
///
+ /// Note that if there are any other calls to this function waiting on lock(s) this may return
+ /// without doing any work. All available events that need handling will be handled before the
+ /// other calls return.
+ ///
/// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
/// [`send_data`]: SocketDescriptor::send_data
pub fn process_events(&self) {
- let _single_processor_lock = self.event_processing_lock.lock().unwrap();
+ let mut _single_processor_lock = self.event_processing_lock.try_lock();
+ if _single_processor_lock.is_err() {
+ // While we could wake the older sleeper here with a CV and make more even waiting
+ // times, that would be a lot of overengineering for a simple "reduce total waiter
+ // count" goal.
+ match self.blocked_event_processors.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) {
+ Err(val) => {
+ debug_assert!(val, "compare_exchange failed spuriously?");
+ return;
+ },
+ Ok(val) => {
+ debug_assert!(!val, "compare_exchange succeeded spuriously?");
+ // We're the only waiter, as the running process_events may have emptied the
+ // pending events "long" ago and there are new events for us to process, wait until
+ // its done and process any leftover events before returning.
+ _single_processor_lock = Ok(self.event_processing_lock.lock().unwrap());
+ self.blocked_event_processors.store(false, Ordering::Release);
+ }
+ }
+ }
let mut peers_to_disconnect = HashMap::new();
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
}
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
match descriptor_opt {
- Some(descriptor) => match peers.peers.get(&descriptor) {
+ Some(descriptor) => match peers.get(&descriptor) {
Some(peer_mutex) => {
let peer_lock = peer_mutex.lock().unwrap();
if peer_lock.their_features.is_none() {
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
}
- for (descriptor, peer_mutex) in peers.peers.iter() {
+ for (descriptor, peer_mutex) in peers.iter() {
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
}
}
// lock).
if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
- if let Some(peer_mutex) = peers.peers.remove(&descriptor) {
+ if let Some(peer_mutex) = peers.remove(&descriptor) {
if let Some(msg) = msg {
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
let mut peers = self.peers.write().unwrap();
- let peer_option = peers.peers.remove(descriptor);
+ let peer_option = peers.remove(descriptor);
match peer_option {
None => {
// This is most likely a simple race condition where the user found that the socket
},
Some(peer_lock) => {
let peer = peer_lock.lock().unwrap();
- match peer.their_node_id {
- Some(node_id) => {
- log_trace!(self.logger,
- "Handling disconnection of peer {}, with {}future connection to the peer possible.",
- log_pubkey!(node_id), if no_connection_possible { "no " } else { "" });
- self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
- self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
- },
- None => {}
+ if let Some(node_id) = peer.their_node_id {
+ log_trace!(self.logger,
+ "Handling disconnection of peer {}, with {}future connection to the peer possible.",
+ log_pubkey!(node_id), if no_connection_possible { "no " } else { "" });
+ self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
+ self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
}
}
};
let mut peers_lock = self.peers.write().unwrap();
if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id);
- peers_lock.peers.remove(&descriptor);
+ peers_lock.remove(&descriptor);
self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
descriptor.disconnect_socket();
}
let mut peers_lock = self.peers.write().unwrap();
self.node_id_to_descriptor.lock().unwrap().clear();
let peers = &mut *peers_lock;
- for (mut descriptor, peer) in peers.peers.drain() {
+ for (mut descriptor, peer) in peers.drain() {
if let Some(node_id) = peer.lock().unwrap().their_node_id {
log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
{
let peers_lock = self.peers.read().unwrap();
- for (descriptor, peer_mutex) in peers_lock.peers.iter() {
+ for (descriptor, peer_mutex) in peers_lock.iter() {
let mut peer = peer_mutex.lock().unwrap();
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
// The peer needs to complete its handshake before we can exchange messages. We
if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
|| peer.awaiting_pong_timer_tick_intervals as u64 >
- MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.peers.len() as u64
+ MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64
{
descriptors_needing_disconnect.push(descriptor.clone());
continue;
{
let mut peers_lock = self.peers.write().unwrap();
for descriptor in descriptors_needing_disconnect.iter() {
- if let Some(peer) = peers_lock.peers.remove(&descriptor) {
+ if let Some(peer) = peers_lock.remove(descriptor) {
if let Some(node_id) = peer.lock().unwrap().their_node_id {
log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id);
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
#[cfg(test)]
mod tests {
use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses};
- use ln::msgs;
+ use ln::{msgs, wire};
use ln::msgs::NetAddress;
use util::events;
use util::test_utils;
use bitcoin::secp256k1::Secp256k1;
- use bitcoin::secp256k1::key::{SecretKey, PublicKey};
+ use bitcoin::secp256k1::{SecretKey, PublicKey};
use prelude::*;
use sync::{Arc, Mutex};
let chan_handler = test_utils::TestChannelMessageHandler::new();
let mut peers = create_network(2, &cfgs);
establish_connection(&peers[0], &peers[1]);
- assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 1);
let secp_ctx = Secp256k1::new();
let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
peers[0].message_handler.chan_handler = &chan_handler;
peers[0].process_events();
- assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 0);
+ }
+
+ #[test]
+ fn test_send_simple_msg() {
+ // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
+ // push a message from one peer to another.
+ let cfgs = create_peermgr_cfgs(2);
+ let a_chan_handler = test_utils::TestChannelMessageHandler::new();
+ let b_chan_handler = test_utils::TestChannelMessageHandler::new();
+ let mut peers = create_network(2, &cfgs);
+ let (fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 1);
+
+ let secp_ctx = Secp256k1::new();
+ let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
+
+ let msg = msgs::Shutdown { channel_id: [42; 32], scriptpubkey: bitcoin::Script::new() };
+ a_chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::SendShutdown {
+ node_id: their_id, msg: msg.clone()
+ });
+ peers[0].message_handler.chan_handler = &a_chan_handler;
+
+ b_chan_handler.expect_receive_msg(wire::Message::Shutdown(msg));
+ peers[1].message_handler.chan_handler = &b_chan_handler;
+
+ peers[0].process_events();
+
+ let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
+ }
+
+ #[test]
+ fn test_disconnect_all_peer() {
+ // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
+ // then calls disconnect_all_peers
+ let cfgs = create_peermgr_cfgs(2);
+ let peers = create_network(2, &cfgs);
+ establish_connection(&peers[0], &peers[1]);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 1);
+
+ peers[0].disconnect_all_peers();
+ assert_eq!(peers[0].peers.read().unwrap().len(), 0);
}
#[test]
let cfgs = create_peermgr_cfgs(2);
let peers = create_network(2, &cfgs);
establish_connection(&peers[0], &peers[1]);
- assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 1);
// peers[0] awaiting_pong is set to true, but the Peer is still connected
peers[0].timer_tick_occurred();
peers[0].process_events();
- assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 1);
// Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
peers[0].timer_tick_occurred();
peers[0].process_events();
- assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 0);
}
#[test]
peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();
// If we get a single timer tick before completion, that's fine
- assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 1);
peers[0].timer_tick_occurred();
- assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 1);
assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
peers[0].process_events();
// ...but if we get a second timer tick, we should disconnect the peer
peers[0].timer_tick_occurred();
- assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0);
+ assert_eq!(peers[0].peers.read().unwrap().len(), 0);
assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err());
}