X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=8c3ab968af20f92e3a544d7d1fea024837bd26e6;hb=d0b8f455fe86d3e55231d35d35dd96693abe2049;hp=11f1206339dde3580f4346ccf3b735c2c44e551b;hpb=02b187856bc7f1d7edd46897e2f983a65a97230c;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 11f12063..8c3ab968 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -46,16 +46,23 @@ use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::sha256::HashEngine as Sha256Engine; use bitcoin::hashes::{HashEngine, Hash}; -/// Handler for BOLT1-compliant messages. +/// A handler provided to [`PeerManager`] for reading and handling custom messages. +/// +/// [BOLT 1] specifies a custom message type range for use with experimental or application-specific +/// messages. `CustomMessageHandler` allows for user-defined handling of such types. See the +/// [`lightning_custom_message`] crate for tools useful in composing more than one custom handler. +/// +/// [BOLT 1]: https://github.com/lightning/bolts/blob/master/01-messaging.md +/// [`lightning_custom_message`]: https://docs.rs/lightning_custom_message/latest/lightning_custom_message pub trait CustomMessageHandler: wire::CustomMessageReader { - /// Called with the message type that was received and the buffer to be read. - /// Can return a `MessageHandlingError` if the message could not be handled. + /// Handles the given message sent from `sender_node_id`, possibly producing messages for + /// [`CustomMessageHandler::get_and_clear_pending_msg`] to return and thus for [`PeerManager`] + /// to send. fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>; - /// Gets the list of pending messages which were generated by the custom message - /// handler, clearing the list in the process. The first tuple element must - /// correspond to the intended recipients node ids. If no connection to one of the - /// specified node does not exist, the message is simply not sent to it. + /// Returns the list of pending messages that were generated by the handler, clearing the list + /// in the process. Each message is paired with the node id of the intended recipient. If no + /// connection to the node exists, then the message is simply not sent. fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>; } @@ -390,7 +397,9 @@ const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; struct Peer { channel_encryptor: PeerChannelEncryptor, - their_node_id: Option, + /// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip + /// messages in `PeerManager`. Use `Peer::set_their_node_id` to modify this field. + their_node_id: Option<(PublicKey, NodeId)>, their_features: Option, their_net_address: Option, @@ -413,6 +422,12 @@ struct Peer { awaiting_pong_timer_tick_intervals: i8, received_message_since_timer_tick: bool, sent_gossip_timestamp_filter: bool, + + /// Indicates we've received a `channel_announcement` since the last time we had + /// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a + /// `channel_announcement` at all - we set this unconditionally but unset it every time we + /// check if we're gossip-processing-backlogged). + received_channel_announce_since_backlogged: bool, } impl Peer { @@ -449,8 +464,12 @@ impl Peer { /// Returns whether we should be reading bytes from this peer, based on whether its outbound /// buffer still has space and we don't need to pause reads to get some writes out. - fn should_read(&self) -> bool { - self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE + fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool { + if !gossip_processing_backlogged { + self.received_channel_announce_since_backlogged = false; + } + self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && + (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged) } /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's @@ -482,6 +501,10 @@ impl Peer { total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO } + + fn set_their_node_id(&mut self, node_id: PublicKey) { + self.their_node_id = Some((node_id, NodeId::from_pubkey(&node_id))); + } } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -563,6 +586,9 @@ pub struct PeerManager PeerManager(&'a Option); +struct OptionalFromDebugger<'a>(&'a Option<(PublicKey, NodeId)>); impl core::fmt::Display for OptionalFromDebugger<'_> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> { - if let Some(node_id) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) } + if let Some((node_id, _)) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) } } } @@ -721,6 +747,8 @@ impl Vec { + /// The returned `Option`s will only be `Some` if an address had been previously given via + /// [`Self::new_outbound_connection`] or [`Self::new_inbound_connection`]. + pub fn get_peer_node_ids(&self) -> Vec<(PublicKey, Option)> { let peers = self.peers.read().unwrap(); peers.values().filter_map(|peer_mutex| { let p = peer_mutex.lock().unwrap(); - if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() { + if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() || + p.their_node_id.is_none() { return None; } - p.their_node_id + Some((p.their_node_id.unwrap().0, p.their_net_address.clone())) }).collect() } @@ -752,7 +786,7 @@ impl bool { + peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed)) + } + + fn update_gossip_backlogged(&self) { + let new_state = self.message_handler.route_handler.processing_queue_high(); + let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed); + if prev_state && !new_state { + self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed); + } + } + + fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) { + let mut have_written = false; while !peer.awaiting_write_event { if peer.should_buffer_onion_message() { - if let Some(peer_node_id) = peer.their_node_id { + if let Some((peer_node_id, _)) = peer.their_node_id { if let Some(next_onion_message) = self.message_handler.onion_message_handler.next_onion_message_for_peer(peer_node_id) { self.enqueue_message(peer, &next_onion_message); @@ -904,13 +955,23 @@ impl return, + None => { + if force_one_write && !have_written { + if should_read { + let data_sent = descriptor.send_data(&[], should_read); + debug_assert_eq!(data_sent, 0, "Can't write more than no data"); + } + } + return + }, Some(buff) => buff, }; let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; - let data_sent = descriptor.send_data(pending, peer.should_read()); + let data_sent = descriptor.send_data(pending, should_read); + have_written = true; peer.pending_outbound_buffer_first_msg_offset += data_sent; if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { peer.pending_outbound_buffer_first_msg_offset = 0; @@ -945,7 +1006,7 @@ impl { let mut peer = peer_mutex.lock().unwrap(); peer.awaiting_write_event = false; - self.do_attempt_write_data(descriptor, &mut peer); + self.do_attempt_write_data(descriptor, &mut peer, false); } }; Ok(()) @@ -963,6 +1024,9 @@ impl Result { @@ -979,9 +1043,9 @@ impl(&self, peer: &mut Peer, message: &M) { if is_gossip_msg(message.type_id()) { - log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())); + log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); } else { - log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())) + log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)) } peer.msgs_sent_since_pong += 1; peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message)); @@ -1066,14 +1130,14 @@ impl { - match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap()) { + match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) { hash_map::Entry::Occupied(_) => { - log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap())); + log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event return Err(PeerHandleError{ no_connection_possible: false }) }, hash_map::Entry::Vacant(entry) => { - log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap())); + log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); entry.insert(peer_descriptor.clone()) }, }; @@ -1097,7 +1161,7 @@ impl, message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { - let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages"); + let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; peer_lock.received_message_since_timer_tick = true; // Need an Init as first message @@ -1278,6 +1342,10 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::NodeAnnouncement(msg) => { if self.message_handler.route_handler.handle_node_announcement(&msg) .map_err(|e| -> MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::NodeAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::ChannelUpdate(msg) => { self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg); @@ -1417,6 +1487,7 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelUpdate(msg)); } + self.update_gossip_backlogged(); }, wire::Message::QueryShortChannelIds(msg) => { self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?; @@ -1468,13 +1539,12 @@ impl { let peer = peer_lock.lock().unwrap(); - if let Some(node_id) = peer.their_node_id { + if let Some((node_id, _)) = peer.their_node_id { log_trace!(self.logger, "Handling disconnection of peer {}, with {}future connection to the peer possible.", log_pubkey!(node_id), if no_connection_possible { "no " } else { "" }); @@ -1888,7 +1963,7 @@ impl 0 && !peer.received_message_since_timer_tick) + || peer.awaiting_pong_timer_tick_intervals as u64 > + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 + { + descriptors_needing_disconnect.push(descriptor.clone()); + break; + } peer.received_message_since_timer_tick = false; - continue; - } - if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) - || peer.awaiting_pong_timer_tick_intervals as u64 > - MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 - { - descriptors_needing_disconnect.push(descriptor.clone()); - continue; - } - peer.received_message_since_timer_tick = false; + if peer.awaiting_pong_timer_tick_intervals > 0 { + peer.awaiting_pong_timer_tick_intervals += 1; + break; + } - if peer.awaiting_pong_timer_tick_intervals > 0 { - peer.awaiting_pong_timer_tick_intervals += 1; - continue; + peer.awaiting_pong_timer_tick_intervals = 1; + let ping = msgs::Ping { + ponglen: 0, + byteslen: 64, + }; + self.enqueue_message(&mut *peer, &ping); + break; } - - peer.awaiting_pong_timer_tick_intervals = 1; - let ping = msgs::Ping { - ponglen: 0, - byteslen: 64, - }; - self.enqueue_message(&mut *peer, &ping); - self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); + self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, flush_read_disabled); } } @@ -1978,7 +2061,7 @@ impl(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { - let a_id = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); + let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; + let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; + let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap(); let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; - let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); - peer_a.new_inbound_connection(fd_a.clone(), None).unwrap(); + let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); peer_a.process_events(); @@ -2172,6 +2258,9 @@ mod tests { let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false); + assert!(peer_a.get_peer_node_ids().contains(&(id_b, Some(addr_b)))); + assert!(peer_b.get_peer_node_ids().contains(&(id_a, Some(addr_a)))); + (fd_a.clone(), fd_b.clone()) }