X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=a7f35a59c275baeb70ca6fdb607fe85283c4e965;hb=5ba2f8091f06f696d62d94c70a339f14300dbbda;hp=a20b316eb0a93e997a162e9857ef3cd60ad02373;hpb=bada71394e96971bcf29fe997ecc9602ec305da4;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index a20b316e..a7f35a59 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -26,7 +26,7 @@ use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager use crate::util::ser::{VecWriter, Writeable, Writer}; use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; use crate::ln::wire; -use crate::ln::wire::Encode; +use crate::ln::wire::{Encode, Type}; use crate::onion_message::{CustomOnionMessageContents, CustomOnionMessageHandler, SimpleArcOnionMessenger, SimpleRefOnionMessenger}; use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias}; use crate::util::atomic_counter::AtomicCounter; @@ -36,7 +36,7 @@ use crate::prelude::*; use crate::io; use alloc::collections::LinkedList; use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock}; -use core::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; use core::convert::Infallible; @@ -696,15 +696,18 @@ pub struct PeerManager>, - /// We can only have one thread processing events at once, but we don't usually need the full - /// `peers` write lock to do so, so instead we block on this empty mutex when entering - /// `process_events`. - event_processing_lock: Mutex<()>, - /// Because event processing is global and always does all available work before returning, - /// there is no reason for us to have many event processors waiting on the lock at once. - /// Instead, we limit the total blocked event processors to always exactly one by setting this - /// when an event process call is waiting. - blocked_event_processors: AtomicBool, + /// We can only have one thread processing events at once, but if a second call to + /// `process_events` happens while a first call is in progress, one of the two calls needs to + /// start from the top to ensure any new messages are also handled. + /// + /// Because the event handler calls into user code which may block, we don't want to block a + /// second thread waiting for another thread to handle events which is then blocked on user + /// code, so we store an atomic counter here: + /// * 0 indicates no event processor is running + /// * 1 indicates an event processor is running + /// * > 1 indicates an event processor is running but needs to start again from the top once + /// it finishes as another thread tried to start processing events but returned early. + event_processing_state: AtomicI32, /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this /// value increases strictly since we don't assume access to a time source. @@ -874,8 +877,7 @@ impl x, Err(e) => { match e.action { - msgs::ErrorAction::DisconnectPeer { msg: _ } => { - //TODO: Try to push msg + msgs::ErrorAction::DisconnectPeer { .. } => { + // We may have an `ErrorMessage` to send to the peer, + // but writing to the socket while reading can lead to + // re-entrant code and possibly unexpected behavior. The + // message send is optimistic anyway, and in this case + // we immediately disconnect the peer. + log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err); + return Err(PeerHandleError { }); + }, + msgs::ErrorAction::DisconnectPeerWithWarning { .. } => { + // We have a `WarningMessage` to send to the peer, but + // writing to the socket while reading can lead to + // re-entrant code and possibly unexpected behavior. The + // message send is optimistic anyway, and in this case + // we immediately disconnect the peer. log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err); return Err(PeerHandleError { }); }, @@ -1360,7 +1375,7 @@ impl x, Err(e) => { match e { - // Note that to avoid recursion we never call + // Note that to avoid re-entrancy we never call // `do_attempt_write_data` from here, causing // the messages enqueued here to not actually // be sent before the peer is disconnected. @@ -1381,9 +1396,8 @@ impl { - log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); - self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) }); + (msgs::DecodeError::UnknownRequiredFeature, _) => { + log_debug!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); return Err(PeerHandleError { }); } (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }), @@ -1814,356 +1828,364 @@ impl { - debug_assert!(val, "compare_exchange failed spuriously?"); - return; - }, - Ok(val) => { - debug_assert!(!val, "compare_exchange succeeded spuriously?"); - // We're the only waiter, as the running process_events may have emptied the - // pending events "long" ago and there are new events for us to process, wait until - // its done and process any leftover events before returning. - _single_processor_lock = Ok(self.event_processing_lock.lock().unwrap()); - self.blocked_event_processors.store(false, Ordering::Release); - } - } + if self.event_processing_state.fetch_add(1, Ordering::AcqRel) > 0 { + // If we're not the first event processor to get here, just return early, the increment + // we just did will be treated as "go around again" at the end. + return; } - self.update_gossip_backlogged(); - let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed); - - let mut peers_to_disconnect = HashMap::new(); - let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); - events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); + loop { + self.update_gossip_backlogged(); + let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed); - { - // TODO: There are some DoS attacks here where you can flood someone's outbound send - // buffer by doing things like announcing channels on another node. We should be willing to - // drop optional-ish messages when send buffers get full! + let mut peers_to_disconnect = HashMap::new(); + let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); + events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); - let peers_lock = self.peers.read().unwrap(); - let peers = &*peers_lock; - macro_rules! get_peer_for_forwarding { - ($node_id: expr) => { - { - if peers_to_disconnect.get($node_id).is_some() { - // If we've "disconnected" this peer, do not send to it. - continue; - } - let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); - match descriptor_opt { - Some(descriptor) => match peers.get(&descriptor) { - Some(peer_mutex) => { - let peer_lock = peer_mutex.lock().unwrap(); - if !peer_lock.handshake_complete() { + { + // TODO: There are some DoS attacks here where you can flood someone's outbound send + // buffer by doing things like announcing channels on another node. We should be willing to + // drop optional-ish messages when send buffers get full! + + let peers_lock = self.peers.read().unwrap(); + let peers = &*peers_lock; + macro_rules! get_peer_for_forwarding { + ($node_id: expr) => { + { + if peers_to_disconnect.get($node_id).is_some() { + // If we've "disconnected" this peer, do not send to it. + continue; + } + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); + match descriptor_opt { + Some(descriptor) => match peers.get(&descriptor) { + Some(peer_mutex) => { + let peer_lock = peer_mutex.lock().unwrap(); + if !peer_lock.handshake_complete() { + continue; + } + peer_lock + }, + None => { + debug_assert!(false, "Inconsistent peers set state!"); continue; } - peer_lock }, None => { - debug_assert!(false, "Inconsistent peers set state!"); continue; - } - }, - None => { - continue; - }, + }, + } } } } - } - for event in events_generated.drain(..) { - match event { - MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id), - log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); - // TODO: If the peer is gone we should generate a DiscardFunding event - // indicating to the wallet that they should just throw away this funding transaction - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { - log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", - log_pubkey!(node_id), - update_add_htlcs.len(), - update_fulfill_htlcs.len(), - update_fail_htlcs.len(), - log_bytes!(commitment_signed.channel_id)); - let mut peer = get_peer_for_forwarding!(node_id); - for msg in update_add_htlcs { - self.enqueue_message(&mut *peer, msg); - } - for msg in update_fulfill_htlcs { - self.enqueue_message(&mut *peer, msg); - } - for msg in update_fail_htlcs { - self.enqueue_message(&mut *peer, msg); - } - for msg in update_fail_malformed_htlcs { - self.enqueue_message(&mut *peer, msg); - } - if let &Some(ref msg) = update_fee { - self.enqueue_message(&mut *peer, msg); - } - self.enqueue_message(&mut *peer, commitment_signed); - }, - MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendShutdown { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { - log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", - log_pubkey!(node_id), - msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg); - }, - MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { - log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); - match self.message_handler.route_handler.handle_channel_announcement(&msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None), - _ => {}, - } - if let Some(msg) = update_msg { + for event in events_generated.drain(..) { + match event { + MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id), + log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); + // TODO: If the peer is gone we should generate a DiscardFunding event + // indicating to the wallet that they should just throw away this funding transaction + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", + log_pubkey!(node_id), + update_add_htlcs.len(), + update_fulfill_htlcs.len(), + update_fail_htlcs.len(), + log_bytes!(commitment_signed.channel_id)); + let mut peer = get_peer_for_forwarding!(node_id); + for msg in update_add_htlcs { + self.enqueue_message(&mut *peer, msg); + } + for msg in update_fulfill_htlcs { + self.enqueue_message(&mut *peer, msg); + } + for msg in update_fail_htlcs { + self.enqueue_message(&mut *peer, msg); + } + for msg in update_fail_malformed_htlcs { + self.enqueue_message(&mut *peer, msg); + } + if let &Some(ref msg) = update_fee { + self.enqueue_message(&mut *peer, msg); + } + self.enqueue_message(&mut *peer, commitment_signed); + }, + MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendShutdown { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { + log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", + log_pubkey!(node_id), + msg.contents.short_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg); + }, + MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { + log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); + match self.message_handler.route_handler.handle_channel_announcement(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None), + _ => {}, + } + if let Some(msg) = update_msg { + match self.message_handler.route_handler.handle_channel_update(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), + _ => {}, + } + } + }, + MessageSendEvent::BroadcastChannelUpdate { msg } => { + log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); match self.message_handler.route_handler.handle_channel_update(&msg) { Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), _ => {}, } + }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id); + match self.message_handler.route_handler.handle_node_announcement(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None), + _ => {}, + } + }, + MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { + log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), msg.contents.short_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::HandleError { node_id, action } => { + match action { + msgs::ErrorAction::DisconnectPeer { msg } => { + if let Some(msg) = msg.as_ref() { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), msg.data); + } else { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}", + log_pubkey!(node_id)); + } + // We do not have the peers write lock, so we just store that we're + // about to disconenct the peer and do it after we finish + // processing most messages. + let msg = msg.map(|msg| wire::Message::<<::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); + peers_to_disconnect.insert(node_id, msg); + }, + msgs::ErrorAction::DisconnectPeerWithWarning { msg } => { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), msg.data); + // We do not have the peers write lock, so we just store that we're + // about to disconenct the peer and do it after we finish + // processing most messages. + peers_to_disconnect.insert(node_id, Some(wire::Message::Warning(msg))); + }, + msgs::ErrorAction::IgnoreAndLog(level) => { + log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + }, + msgs::ErrorAction::IgnoreDuplicateGossip => {}, + msgs::ErrorAction::IgnoreError => { + log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + }, + msgs::ErrorAction::SendErrorMessage { ref msg } => { + log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), + msg.data); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); + }, + msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { + log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), + msg.data); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); + }, + } + }, + MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - }, - MessageSendEvent::BroadcastChannelUpdate { msg } => { - log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); - match self.message_handler.route_handler.handle_channel_update(&msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), - _ => {}, - } - }, - MessageSendEvent::BroadcastNodeAnnouncement { msg } => { - log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id); - match self.message_handler.route_handler.handle_node_announcement(&msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None), - _ => {}, + MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { + log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", + log_pubkey!(node_id), + msg.short_channel_ids.len(), + msg.first_blocknum, + msg.number_of_blocks, + msg.sync_complete); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - }, - MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { - log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::HandleError { ref node_id, ref action } => { - match *action { - msgs::ErrorAction::DisconnectPeer { ref msg } => { - // We do not have the peers write lock, so we just store that we're - // about to disconenct the peer and do it after we finish - // processing most messages. - peers_to_disconnect.insert(*node_id, msg.clone()); - }, - msgs::ErrorAction::IgnoreAndLog(level) => { - log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); - }, - msgs::ErrorAction::IgnoreDuplicateGossip => {}, - msgs::ErrorAction::IgnoreError => { - log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); - }, - msgs::ErrorAction::SendErrorMessage { ref msg } => { - log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { - log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, + MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - }, - MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - } - MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { - log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", - log_pubkey!(node_id), - msg.short_channel_ids.len(), - msg.first_blocknum, - msg.number_of_blocks, - msg.sync_complete); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - } - MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } } - } - for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() { - if peers_to_disconnect.get(&node_id).is_some() { continue; } - self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); - } + for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() { + if peers_to_disconnect.get(&node_id).is_some() { continue; } + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); + } - for (descriptor, peer_mutex) in peers.iter() { - let mut peer = peer_mutex.lock().unwrap(); - if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; } - self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled); + for (descriptor, peer_mutex) in peers.iter() { + let mut peer = peer_mutex.lock().unwrap(); + if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; } + self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled); + } } - } - if !peers_to_disconnect.is_empty() { - let mut peers_lock = self.peers.write().unwrap(); - let peers = &mut *peers_lock; - for (node_id, msg) in peers_to_disconnect.drain() { - // Note that since we are holding the peers *write* lock we can - // remove from node_id_to_descriptor immediately (as no other - // thread can be holding the peer lock if we have the global write - // lock). - - let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); - if let Some(mut descriptor) = descriptor_opt { - if let Some(peer_mutex) = peers.remove(&descriptor) { - let mut peer = peer_mutex.lock().unwrap(); - if let Some(msg) = msg { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - self.enqueue_message(&mut *peer, &msg); - // This isn't guaranteed to work, but if there is enough free - // room in the send buffer, put the error message there... - self.do_attempt_write_data(&mut descriptor, &mut *peer, false); - } - self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError"); - } else { debug_assert!(false, "Missing connection for peer"); } + if !peers_to_disconnect.is_empty() { + let mut peers_lock = self.peers.write().unwrap(); + let peers = &mut *peers_lock; + for (node_id, msg) in peers_to_disconnect.drain() { + // Note that since we are holding the peers *write* lock we can + // remove from node_id_to_descriptor immediately (as no other + // thread can be holding the peer lock if we have the global write + // lock). + + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); + if let Some(mut descriptor) = descriptor_opt { + if let Some(peer_mutex) = peers.remove(&descriptor) { + let mut peer = peer_mutex.lock().unwrap(); + if let Some(msg) = msg { + self.enqueue_message(&mut *peer, &msg); + // This isn't guaranteed to work, but if there is enough free + // room in the send buffer, put the error message there... + self.do_attempt_write_data(&mut descriptor, &mut *peer, false); + } + self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError"); + } else { debug_assert!(false, "Missing connection for peer"); } + } } } + + if self.event_processing_state.fetch_sub(1, Ordering::AcqRel) != 1 { + // If another thread incremented the state while we were running we should go + // around again, but only once. + self.event_processing_state.store(1, Ordering::Release); + continue; + } + break; } } @@ -3004,4 +3026,53 @@ mod tests { // For (None) assert_eq!(filter_addresses(None), None); } + + #[test] + #[cfg(feature = "std")] + fn test_process_events_multithreaded() { + use std::time::{Duration, Instant}; + // Test that `process_events` getting called on multiple threads doesn't generate too many + // loop iterations. + // Each time `process_events` goes around the loop we call + // `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`. + // Because the loop should go around once more after a call which fails to take the + // single-threaded lock, if we write zero to the counter before calling `process_events` we + // should never observe there having been more than 2 loop iterations. + // Further, because the last thread to exit will call `process_events` before returning, we + // should always have at least one count at the end. + let cfg = Arc::new(create_peermgr_cfgs(1)); + // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }. + let peer = Arc::new(create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap()); + + let exit_flag = Arc::new(AtomicBool::new(false)); + macro_rules! spawn_thread { () => { { + let thread_cfg = Arc::clone(&cfg); + let thread_peer = Arc::clone(&peer); + let thread_exit = Arc::clone(&exit_flag); + std::thread::spawn(move || { + while !thread_exit.load(Ordering::Acquire) { + thread_cfg[0].chan_handler.message_fetch_counter.store(0, Ordering::Release); + thread_peer.process_events(); + std::thread::sleep(Duration::from_micros(1)); + } + }) + } } } + + let thread_a = spawn_thread!(); + let thread_b = spawn_thread!(); + let thread_c = spawn_thread!(); + + let start_time = Instant::now(); + while start_time.elapsed() < Duration::from_millis(100) { + let val = cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire); + assert!(val <= 2); + std::thread::yield_now(); // Winblowz seemingly doesn't ever interrupt threads?! + } + + exit_flag.store(true, Ordering::Release); + thread_a.join().unwrap(); + thread_b.join().unwrap(); + thread_c.join().unwrap(); + assert!(cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire) >= 1); + } }