/// lock held. Entries may be added with only the `peers` read lock held (though the
/// `Descriptor` value must already exist in `peers`).
node_id_to_descriptor: Mutex<HashMap<PublicKey, Descriptor>>,
+ /// We can only have one thread processing events at once, but we don't usually need the full
+ /// `peers` write lock to do so, so instead we block on this empty mutex when entering
+ /// `process_events`.
+ event_processing_lock: Mutex<()>,
our_node_secret: SecretKey,
ephemeral_key_midstate: Sha256Engine,
custom_message_handler: CMH,
peers: HashMap::new(),
}),
node_id_to_descriptor: Mutex::new(HashMap::new()),
+ event_processing_lock: Mutex::new(()),
our_node_secret,
ephemeral_key_midstate,
peer_counter: AtomicCounter::new(),
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
/// [`send_data`]: SocketDescriptor::send_data
pub fn process_events(&self) {
+ let _single_processor_lock = self.event_processing_lock.lock().unwrap();
+
+ let mut peers_to_disconnect = HashMap::new();
+ let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+ events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
+
{
// TODO: There are some DoS attacks here where you can flood someone's outbound send
// buffer by doing things like announcing channels on another node. We should be willing to
// drop optional-ish messages when send buffers get full!
- let mut peers_lock = self.peers.write().unwrap();
- let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
- events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
- let peers = &mut *peers_lock;
+ let peers_lock = self.peers.read().unwrap();
+ let peers = &*peers_lock;
macro_rules! get_peer_for_forwarding {
($node_id: expr) => {
{
- match self.node_id_to_descriptor.lock().unwrap().get($node_id) {
- Some(descriptor) => match peers.peers.get_mut(&descriptor) {
+ if peers_to_disconnect.get($node_id).is_some() {
+ // If we've "disconnected" this peer, do not send to it.
+ continue;
+ }
+ let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
+ match descriptor_opt {
+ Some(descriptor) => match peers.peers.get(&descriptor) {
Some(peer_mutex) => {
let peer_lock = peer_mutex.lock().unwrap();
if peer_lock.their_features.is_none() {
}
peer_lock
},
- None => panic!("Inconsistent peers set state!"),
+ None => {
+ debug_assert!(false, "Inconsistent peers set state!");
+ continue;
+ }
},
None => {
continue;
MessageSendEvent::HandleError { ref node_id, ref action } => {
match *action {
msgs::ErrorAction::DisconnectPeer { ref msg } => {
- // Note that since we are holding the peers *write* lock we can
- // remove from node_id_to_descriptor immediately (as no other
- // thread can be holding the peer lock if we have the global write
- // lock).
- if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(node_id) {
- if let Some(peer_mutex) = peers.peers.remove(&descriptor) {
- if let Some(ref msg) = *msg {
- log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
- log_pubkey!(node_id),
- msg.data);
- let mut peer = peer_mutex.lock().unwrap();
- self.enqueue_message(&mut *peer, msg);
- // This isn't guaranteed to work, but if there is enough free
- // room in the send buffer, put the error message there...
- self.do_attempt_write_data(&mut descriptor, &mut *peer);
- } else {
- log_gossip!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
- }
- }
- descriptor.disconnect_socket();
- self.message_handler.chan_handler.peer_disconnected(&node_id, false);
- }
+ // We do not have the peers write lock, so we just store that we're
+ // about to disconenct the peer and do it after we finish
+ // processing most messages.
+ peers_to_disconnect.insert(*node_id, msg.clone());
},
msgs::ErrorAction::IgnoreAndLog(level) => {
log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
}
for (node_id, msg) in self.custom_message_handler.get_and_clear_pending_msg() {
+ if peers_to_disconnect.get(&node_id).is_some() { continue; }
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
}
- for (descriptor, peer_mutex) in peers.peers.iter_mut() {
+ for (descriptor, peer_mutex) in peers.peers.iter() {
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
}
}
+ if !peers_to_disconnect.is_empty() {
+ let mut peers_lock = self.peers.write().unwrap();
+ let peers = &mut *peers_lock;
+ for (node_id, msg) in peers_to_disconnect.drain() {
+ // Note that since we are holding the peers *write* lock we can
+ // remove from node_id_to_descriptor immediately (as no other
+ // thread can be holding the peer lock if we have the global write
+ // lock).
+
+ if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
+ if let Some(peer_mutex) = peers.peers.remove(&descriptor) {
+ if let Some(msg) = msg {
+ log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
+ log_pubkey!(node_id),
+ msg.data);
+ let mut peer = peer_mutex.lock().unwrap();
+ self.enqueue_message(&mut *peer, &msg);
+ // This isn't guaranteed to work, but if there is enough free
+ // room in the send buffer, put the error message there...
+ self.do_attempt_write_data(&mut descriptor, &mut *peer);
+ } else {
+ log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
+ }
+ }
+ descriptor.disconnect_socket();
+ self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ }
+ }
+ }
}
/// Indicates that the given socket descriptor's connection is now closed.