X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=48bad2d69e16d59c5dd3d3a1436dc94436254f2a;hb=84d3630dffd31c133ff1b7380505597812f6c12f;hp=34bcda3a009a850b780bfffd6d4c087eb0256b28;hpb=b3be420cfbf5eccfeebdd813a4c64a9e2e563711;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 34bcda3a..48bad2d6 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -32,11 +32,11 @@ use routing::network_graph::NetGraphMsgHandler; use prelude::*; use io; use alloc::collections::LinkedList; -use alloc::fmt::Debug; use sync::{Arc, Mutex}; use core::sync::atomic::{AtomicUsize, Ordering}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; +use core::convert::Infallible; #[cfg(feature = "std")] use std::error; use bitcoin::hashes::sha256::Hash as Sha256; @@ -47,7 +47,7 @@ use bitcoin::hashes::{HashEngine, Hash}; pub trait CustomMessageHandler: wire::CustomMessageReader { /// Called with the message type that was received and the buffer to be read. /// Can return a `MessageHandlingError` if the message could not be handled. - fn handle_custom_message(&self, msg: Self::CustomMessage) -> Result<(), LightningError>; + fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>; /// Gets the list of pending messages which were generated by the custom message /// handler, clearing the list in the process. The first tuple element must @@ -66,7 +66,6 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result { Ok(false) } fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result { Ok(false) } fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result { Ok(false) } - fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {} fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option, Option)> { Vec::new() } fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } @@ -81,28 +80,28 @@ impl Deref for IgnoringMessageHandler { fn deref(&self) -> &Self { self } } -impl wire::Type for () { +// Implement Type for Infallible, note that it cannot be constructed, and thus you can never call a +// method that takes self for it. +impl wire::Type for Infallible { fn type_id(&self) -> u16 { - // We should never call this for `DummyCustomType` unreachable!(); } } - -impl Writeable for () { +impl Writeable for Infallible { fn write(&self, _: &mut W) -> Result<(), io::Error> { unreachable!(); } } impl wire::CustomMessageReader for IgnoringMessageHandler { - type CustomMessage = (); + type CustomMessage = Infallible; fn read(&self, _message_type: u16, _buffer: &mut R) -> Result, msgs::DecodeError> { Ok(None) } } impl CustomMessageHandler for IgnoringMessageHandler { - fn handle_custom_message(&self, _msg: Self::CustomMessage) -> Result<(), LightningError> { + fn handle_custom_message(&self, _msg: Infallible, _sender_node_id: &PublicKey) -> Result<(), LightningError> { // Since we always return `None` in the read the handle method should never be called. unreachable!(); } @@ -286,6 +285,10 @@ enum InitSyncTracker{ NodesSyncing(PublicKey), } +/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop +/// forwarding gossip messages to peers altogether. +const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2; + /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until /// we have fewer than this many messages in the outbound buffer again. /// We also use this as the target number of outbound gossip messages to keep in the write buffer, @@ -293,7 +296,29 @@ enum InitSyncTracker{ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10; /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to /// the peer. -const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = 20; +const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO; + +/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through +/// the socket receive buffer before receiving the ping. +/// +/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not +/// including any network delays, outbound traffic, or the same for messages from other peers. +/// +/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks +/// per connected peer to respond to a ping, as long as they send us at least one message during +/// each tick, ensuring we aren't actually just disconnected. +/// With a timer tick interval of five seconds, this translates to about 30 seconds per connected +/// peer. +/// +/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per +/// two connected peers, assuming most LDK-running systems have at least two cores. +const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 6; + +/// This is the minimum number of messages we expect a peer to be able to handle within one timer +/// tick. Once we have sent this many messages since the last ping, we send a ping right away to +/// ensures we don't just fill up our send buffer and leave the peer with too many messages to +/// process before the next ping. +const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; struct Peer { channel_encryptor: PeerChannelEncryptor, @@ -310,7 +335,9 @@ struct Peer { sync_status: InitSyncTracker, - awaiting_pong: bool, + msgs_sent_since_pong: usize, + awaiting_pong_tick_intervals: i8, + received_message_since_timer_tick: bool, } impl Peer { @@ -470,7 +497,7 @@ impl P CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, L::Target: Logger, - CMH::Target: CustomMessageHandler + wire::CustomMessageReader { + CMH::Target: CustomMessageHandler { /// Constructs a new PeerManager with the given message handlers and node_id secret key /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. @@ -552,7 +579,9 @@ impl P sync_status: InitSyncTracker::NoSyncRequested, - awaiting_pong: false, + msgs_sent_since_pong: 0, + awaiting_pong_tick_intervals: 0, + received_message_since_timer_tick: false, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -590,7 +619,9 @@ impl P sync_status: InitSyncTracker::NoSyncRequested, - awaiting_pong: false, + msgs_sent_since_pong: 0, + awaiting_pong_tick_intervals: 0, + received_message_since_timer_tick: false, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -599,7 +630,7 @@ impl P fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { while !peer.awaiting_write_event { - if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE { + if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK { match peer.sync_status { InitSyncTracker::NoSyncRequested => {}, InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => { @@ -644,6 +675,9 @@ impl P }, } } + if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK { + self.maybe_send_extra_ping(peer); + } if { let next_buff = match peer.pending_outbound_buffer.front() { @@ -719,14 +753,18 @@ impl P } } - /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly. - fn enqueue_message(&self, peer: &mut Peer, message: &M) { - let mut buffer = VecWriter(Vec::new()); - wire::write(message, &mut buffer).unwrap(); // crash if the write failed - let encoded_message = buffer.0; + /// Append a message to a peer's pending outbound/write buffer + fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec) { + peer.msgs_sent_since_pong += 1; + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); + } + /// Append a message to a peer's pending outbound/write buffer + fn enqueue_message(&self, peer: &mut Peer, message: &M) { + let mut buffer = VecWriter(Vec::with_capacity(2048)); + wire::write(message, &mut buffer).unwrap(); // crash if the write failed log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); + self.enqueue_encoded_message(peer, &buffer.0); } fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result { @@ -920,6 +958,7 @@ impl P message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap())); + peer.received_message_since_timer_tick = true; // Need an Init as first message if let wire::Message::Init(_) = message { @@ -983,7 +1022,8 @@ impl P } }, wire::Message::Pong(_msg) => { - peer.awaiting_pong = false; + peer.awaiting_pong_tick_intervals = 0; + peer.msgs_sent_since_pong = 0; }, // Channel messages: @@ -1087,7 +1127,7 @@ impl P log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id); }, wire::Message::Custom(custom) => { - self.custom_message_handler.handle_custom_message(custom)?; + self.custom_message_handler.handle_custom_message(custom, &peer.their_node_id.unwrap())?; }, }; Ok(should_forward) @@ -1104,7 +1144,9 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP { + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT + { log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1115,7 +1157,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + self.enqueue_encoded_message(peer, &encoded_msg); } }, wire::Message::NodeAnnouncement(ref msg) => { @@ -1127,7 +1169,9 @@ impl P !peer.should_forward_node_announcement(msg.contents.node_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP { + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT + { log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1137,7 +1181,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + self.enqueue_encoded_message(peer, &encoded_msg); } }, wire::Message::ChannelUpdate(ref msg) => { @@ -1149,14 +1193,16 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP { + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT + { log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + self.enqueue_encoded_message(peer, &encoded_msg); } }, _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -1170,6 +1216,9 @@ impl P /// May call [`send_data`] on [`SocketDescriptor`]s. Thus, be very careful with reentrancy /// issues! /// + /// You don't have to call this function explicitly if you are using [`lightning-net-tokio`] + /// or one of the other clients provided in our language bindings. + /// /// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards /// [`send_data`]: SocketDescriptor::send_data @@ -1318,9 +1367,6 @@ impl P let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); }, - MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => { - self.message_handler.route_handler.handle_htlc_fail_channel_update(update); - }, MessageSendEvent::HandleError { ref node_id, ref action } => { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { @@ -1432,6 +1478,37 @@ impl P } } + /// Disconnects all currently-connected peers. This is useful on platforms where there may be + /// an indication that TCP sockets have stalled even if we weren't around to time them out + /// using regular ping/pongs. + pub fn disconnect_all_peers(&self) { + let mut peers_lock = self.peers.lock().unwrap(); + let peers = &mut *peers_lock; + for (mut descriptor, peer) in peers.peers.drain() { + if let Some(node_id) = peer.their_node_id { + log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id); + peers.node_id_to_descriptor.remove(&node_id); + self.message_handler.chan_handler.peer_disconnected(&node_id, false); + } + descriptor.disconnect_socket(); + } + debug_assert!(peers.node_id_to_descriptor.is_empty()); + } + + /// This is called when we're blocked on sending additional gossip messages until we receive a + /// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting + /// `awaiting_pong_tick_intervals` to a special flag value to indicate this). + fn maybe_send_extra_ping(&self, peer: &mut Peer) { + if peer.awaiting_pong_tick_intervals == 0 { + peer.awaiting_pong_tick_intervals = -1; + let ping = msgs::Ping { + ponglen: 0, + byteslen: 64, + }; + self.enqueue_message(peer, &ping); + } + } + /// Send pings to each peer and disconnect those which did not respond to the last round of /// pings. /// @@ -1450,9 +1527,18 @@ impl P let node_id_to_descriptor = &mut peers.node_id_to_descriptor; let peers = &mut peers.peers; let mut descriptors_needing_disconnect = Vec::new(); + let peer_count = peers.len(); peers.retain(|descriptor, peer| { - if peer.awaiting_pong { + if !peer.channel_encryptor.is_ready_for_encryption() { + // The peer needs to complete its handshake before we can exchange messages + return true; + } + + if (peer.awaiting_pong_tick_intervals > 0 && !peer.received_message_since_timer_tick) + || peer.awaiting_pong_tick_intervals as u64 > + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64 + { descriptors_needing_disconnect.push(descriptor.clone()); match peer.their_node_id { Some(node_id) => { @@ -1469,21 +1555,26 @@ impl P return false; } - if !peer.channel_encryptor.is_ready_for_encryption() { - // The peer needs to complete its handshake before we can exchange messages + peer.received_message_since_timer_tick = false; + if peer.awaiting_pong_tick_intervals == -1 { + // Magic value set in `maybe_send_extra_ping`. + peer.awaiting_pong_tick_intervals = 1; + return true; + } + + if peer.awaiting_pong_tick_intervals > 0 { + peer.awaiting_pong_tick_intervals += 1; return true; } + peer.awaiting_pong_tick_intervals = 1; let ping = msgs::Ping { ponglen: 0, byteslen: 64, }; self.enqueue_message(peer, &ping); + self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); - let mut descriptor_clone = descriptor.clone(); - self.do_attempt_write_data(&mut descriptor_clone, peer); - - peer.awaiting_pong = true; true }); @@ -1642,11 +1733,23 @@ mod tests { // than can fit into a peer's buffer). let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); - // Make each peer to read the messages that the other peer just wrote to them. - peers[0].process_events(); - peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(); - peers[1].process_events(); - peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(); + // Make each peer to read the messages that the other peer just wrote to them. Note that + // due to the max-messagse-before-bing limits this may take a few iterations to complete. + for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 { + peers[0].process_events(); + let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0); + assert!(!b_read_data.is_empty()); + + peers[1].read_event(&mut fd_b, &b_read_data).unwrap(); + peers[1].process_events(); + + let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0); + assert!(!a_read_data.is_empty()); + peers[0].read_event(&mut fd_a, &a_read_data).unwrap(); + + peers[1].process_events(); + assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages"); + } // Check that each peer has received the expected number of channel updates and channel // announcements.