Merge pull request #2009 from TheBlueMatt/2023-02-no-racey-retries
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 37867adff32d164caa1f3fc3e78f65f49ff1e8d3..8c3ab968af20f92e3a544d7d1fea024837bd26e6 100644 (file)
@@ -46,16 +46,23 @@ use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
 use bitcoin::hashes::{HashEngine, Hash};
 
-/// Handler for BOLT1-compliant messages.
+/// A handler provided to [`PeerManager`] for reading and handling custom messages.
+///
+/// [BOLT 1] specifies a custom message type range for use with experimental or application-specific
+/// messages. `CustomMessageHandler` allows for user-defined handling of such types. See the
+/// [`lightning_custom_message`] crate for tools useful in composing more than one custom handler.
+///
+/// [BOLT 1]: https://github.com/lightning/bolts/blob/master/01-messaging.md
+/// [`lightning_custom_message`]: https://docs.rs/lightning_custom_message/latest/lightning_custom_message
 pub trait CustomMessageHandler: wire::CustomMessageReader {
-       /// Called with the message type that was received and the buffer to be read.
-       /// Can return a `MessageHandlingError` if the message could not be handled.
+       /// Handles the given message sent from `sender_node_id`, possibly producing messages for
+       /// [`CustomMessageHandler::get_and_clear_pending_msg`] to return and thus for [`PeerManager`]
+       /// to send.
        fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>;
 
-       /// Gets the list of pending messages which were generated by the custom message
-       /// handler, clearing the list in the process. The first tuple element must
-       /// correspond to the intended recipients node ids. If no connection to one of the
-       /// specified node does not exist, the message is simply not sent to it.
+       /// Returns the list of pending messages that were generated by the handler, clearing the list
+       /// in the process. Each message is paired with the node id of the intended recipient. If no
+       /// connection to the node exists, then the message is simply not sent.
        fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>;
 }
 
@@ -81,6 +88,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
                InitFeatures::empty()
        }
+       fn processing_queue_high(&self) -> bool { false }
 }
 impl OnionMessageProvider for IgnoringMessageHandler {
        fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
@@ -414,6 +422,12 @@ struct Peer {
        awaiting_pong_timer_tick_intervals: i8,
        received_message_since_timer_tick: bool,
        sent_gossip_timestamp_filter: bool,
+
+       /// Indicates we've received a `channel_announcement` since the last time we had
+       /// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a
+       /// `channel_announcement` at all - we set this unconditionally but unset it every time we
+       /// check if we're gossip-processing-backlogged).
+       received_channel_announce_since_backlogged: bool,
 }
 
 impl Peer {
@@ -450,8 +464,12 @@ impl Peer {
 
        /// Returns whether we should be reading bytes from this peer, based on whether its outbound
        /// buffer still has space and we don't need to pause reads to get some writes out.
-       fn should_read(&self) -> bool {
-               self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
+       fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool {
+               if !gossip_processing_backlogged {
+                       self.received_channel_announce_since_backlogged = false;
+               }
+               self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
+                       (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged)
        }
 
        /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
@@ -568,6 +586,9 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
 
        peer_counter: AtomicCounter,
 
+       gossip_processing_backlogged: AtomicBool,
+       gossip_processing_backlog_lifted: AtomicBool,
+
        node_signer: NS,
 
        logger: L,
@@ -726,6 +747,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        blocked_event_processors: AtomicBool::new(false),
                        ephemeral_key_midstate,
                        peer_counter: AtomicCounter::new(),
+                       gossip_processing_backlogged: AtomicBool::new(false),
+                       gossip_processing_backlog_lifted: AtomicBool::new(false),
                        last_node_announcement_serial: AtomicU32::new(current_time),
                        logger,
                        custom_message_handler,
@@ -805,6 +828,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        awaiting_pong_timer_tick_intervals: 0,
                        received_message_since_timer_tick: false,
                        sent_gossip_timestamp_filter: false,
+
+                       received_channel_announce_since_backlogged: false,
                })).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -852,13 +877,28 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        awaiting_pong_timer_tick_intervals: 0,
                        received_message_since_timer_tick: false,
                        sent_gossip_timestamp_filter: false,
+
+                       received_channel_announce_since_backlogged: false,
                })).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
                Ok(())
        }
 
-       fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
+       fn peer_should_read(&self, peer: &mut Peer) -> bool {
+               peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed))
+       }
+
+       fn update_gossip_backlogged(&self) {
+               let new_state = self.message_handler.route_handler.processing_queue_high();
+               let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed);
+               if prev_state && !new_state {
+                       self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed);
+               }
+       }
+
+       fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) {
+               let mut have_written = false;
                while !peer.awaiting_write_event {
                        if peer.should_buffer_onion_message() {
                                if let Some((peer_node_id, _)) = peer.their_node_id {
@@ -915,13 +955,23 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                self.maybe_send_extra_ping(peer);
                        }
 
+                       let should_read = self.peer_should_read(peer);
                        let next_buff = match peer.pending_outbound_buffer.front() {
-                               None => return,
+                               None => {
+                                       if force_one_write && !have_written {
+                                               if should_read {
+                                                       let data_sent = descriptor.send_data(&[], should_read);
+                                                       debug_assert_eq!(data_sent, 0, "Can't write more than no data");
+                                               }
+                                       }
+                                       return
+                               },
                                Some(buff) => buff,
                        };
 
                        let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
-                       let data_sent = descriptor.send_data(pending, peer.should_read());
+                       let data_sent = descriptor.send_data(pending, should_read);
+                       have_written = true;
                        peer.pending_outbound_buffer_first_msg_offset += data_sent;
                        if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
                                peer.pending_outbound_buffer_first_msg_offset = 0;
@@ -956,7 +1006,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        Some(peer_mutex) => {
                                let mut peer = peer_mutex.lock().unwrap();
                                peer.awaiting_write_event = false;
-                               self.do_attempt_write_data(descriptor, &mut peer);
+                               self.do_attempt_write_data(descriptor, &mut peer, false);
                        }
                };
                Ok(())
@@ -974,6 +1024,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
        /// [`send_data`] call on this descriptor has `resume_read` set (preventing DoS issues in the
        /// send buffer).
        ///
+       /// In order to avoid processing too many messages at once per peer, `data` should be on the
+       /// order of 4KiB.
+       ///
        /// [`send_data`]: SocketDescriptor::send_data
        /// [`process_events`]: PeerManager::process_events
        pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
@@ -1203,7 +1256,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        }
                                                }
                                        }
-                                       pause_read = !peer.should_read();
+                                       pause_read = !self.peer_should_read(peer);
 
                                        if let Some(message) = msg_to_handle {
                                                match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -1289,6 +1342,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        return Ok(None);
                }
 
+               if let wire::Message::ChannelAnnouncement(ref _msg) = message {
+                       peer_lock.received_channel_announce_since_backlogged = true;
+               }
+
                mem::drop(peer_lock);
 
                if is_gossip_msg(message.type_id()) {
@@ -1415,12 +1472,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                .map_err(|e| -> MessageHandlingError { e.into() })? {
                                        should_forward = Some(wire::Message::ChannelAnnouncement(msg));
                                }
+                               self.update_gossip_backlogged();
                        },
                        wire::Message::NodeAnnouncement(msg) => {
                                if self.message_handler.route_handler.handle_node_announcement(&msg)
                                                .map_err(|e| -> MessageHandlingError { e.into() })? {
                                        should_forward = Some(wire::Message::NodeAnnouncement(msg));
                                }
+                               self.update_gossip_backlogged();
                        },
                        wire::Message::ChannelUpdate(msg) => {
                                self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg);
@@ -1428,6 +1487,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                .map_err(|e| -> MessageHandlingError { e.into() })? {
                                        should_forward = Some(wire::Message::ChannelUpdate(msg));
                                }
+                               self.update_gossip_backlogged();
                        },
                        wire::Message::QueryShortChannelIds(msg) => {
                                self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?;
@@ -1578,6 +1638,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        }
                }
 
+               self.update_gossip_backlogged();
+               let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
+
                let mut peers_to_disconnect = HashMap::new();
                let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
                events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
@@ -1722,10 +1785,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
                                                        _ => {},
                                                }
-                                               match self.message_handler.route_handler.handle_channel_update(&update_msg) {
-                                                       Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
-                                                               self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None),
-                                                       _ => {},
+                                               if let Some(msg) = update_msg {
+                                                       match self.message_handler.route_handler.handle_channel_update(&msg) {
+                                                               Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
+                                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
+                                                               _ => {},
+                                                       }
                                                }
                                        },
                                        MessageSendEvent::BroadcastChannelUpdate { msg } => {
@@ -1736,6 +1801,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        _ => {},
                                                }
                                        },
+                                       MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
+                                               log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
+                                               match self.message_handler.route_handler.handle_node_announcement(&msg) {
+                                                       Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
+                                                               self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
+                                                       _ => {},
+                                               }
+                                       },
                                        MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id), msg.contents.short_channel_id);
@@ -1797,7 +1870,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        }
 
                        for (descriptor, peer_mutex) in peers.iter() {
-                               self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
+                               let mut peer = peer_mutex.lock().unwrap();
+                               if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
+                               self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
                        }
                }
                if !peers_to_disconnect.is_empty() {
@@ -1819,7 +1894,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        self.enqueue_message(&mut *peer, &msg);
                                                        // This isn't guaranteed to work, but if there is enough free
                                                        // room in the send buffer, put the error message there...
-                                                       self.do_attempt_write_data(&mut descriptor, &mut *peer);
+                                                       self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
                                                } else {
                                                        log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
                                                }
@@ -1927,8 +2002,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                {
                        let peers_lock = self.peers.read().unwrap();
 
+                       self.update_gossip_backlogged();
+                       let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
+
                        for (descriptor, peer_mutex) in peers_lock.iter() {
                                let mut peer = peer_mutex.lock().unwrap();
+                               if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
+
                                if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
                                        // The peer needs to complete its handshake before we can exchange messages. We
                                        // give peers one timer tick to complete handshake, reusing
@@ -1942,34 +2022,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        continue;
                                }
 
-                               if peer.awaiting_pong_timer_tick_intervals == -1 {
-                                       // Magic value set in `maybe_send_extra_ping`.
-                                       peer.awaiting_pong_timer_tick_intervals = 1;
+                               loop { // Used as a `goto` to skip writing a Ping message.
+                                       if peer.awaiting_pong_timer_tick_intervals == -1 {
+                                               // Magic value set in `maybe_send_extra_ping`.
+                                               peer.awaiting_pong_timer_tick_intervals = 1;
+                                               peer.received_message_since_timer_tick = false;
+                                               break;
+                                       }
+
+                                       if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
+                                               || peer.awaiting_pong_timer_tick_intervals as u64 >
+                                                       MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64
+                                       {
+                                               descriptors_needing_disconnect.push(descriptor.clone());
+                                               break;
+                                       }
                                        peer.received_message_since_timer_tick = false;
-                                       continue;
-                               }
 
-                               if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
-                                       || peer.awaiting_pong_timer_tick_intervals as u64 >
-                                               MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64
-                               {
-                                       descriptors_needing_disconnect.push(descriptor.clone());
-                                       continue;
-                               }
-                               peer.received_message_since_timer_tick = false;
+                                       if peer.awaiting_pong_timer_tick_intervals > 0 {
+                                               peer.awaiting_pong_timer_tick_intervals += 1;
+                                               break;
+                                       }
 
-                               if peer.awaiting_pong_timer_tick_intervals > 0 {
-                                       peer.awaiting_pong_timer_tick_intervals += 1;
-                                       continue;
+                                       peer.awaiting_pong_timer_tick_intervals = 1;
+                                       let ping = msgs::Ping {
+                                               ponglen: 0,
+                                               byteslen: 64,
+                                       };
+                                       self.enqueue_message(&mut *peer, &ping);
+                                       break;
                                }
-
-                               peer.awaiting_pong_timer_tick_intervals = 1;
-                               let ping = msgs::Ping {
-                                       ponglen: 0,
-                                       byteslen: 64,
-                               };
-                               self.enqueue_message(&mut *peer, &ping);
-                               self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
+                               self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, flush_read_disabled);
                        }
                }