X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=758e16b5a356cdeb02947d30ae7bd73996a7f0b9;hb=af58b09ab80e45b225274c9004a0c98a95b6aad7;hp=68751f30ef11b2b83d507fd4f84c5343405288d8;hpb=401d03599d2ad5b7a446eeee2ccb670b47636895;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 68751f30..758e16b5 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -101,7 +101,7 @@ impl wire::CustomMessageReader for IgnoringMessageHandler { } impl CustomMessageHandler for IgnoringMessageHandler { - fn handle_custom_message(&self, _msg: Self::CustomMessage, _sender_node_id: &PublicKey) -> Result<(), LightningError> { + fn handle_custom_message(&self, _msg: Infallible, _sender_node_id: &PublicKey) -> Result<(), LightningError> { // Since we always return `None` in the read the handle method should never be called. unreachable!(); } @@ -285,6 +285,10 @@ enum InitSyncTracker{ NodesSyncing(PublicKey), } +/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop +/// forwarding gossip messages to peers altogether. +const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2; + /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until /// we have fewer than this many messages in the outbound buffer again. /// We also use this as the target number of outbound gossip messages to keep in the write buffer, @@ -292,7 +296,7 @@ enum InitSyncTracker{ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10; /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to /// the peer. -const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = 20; +const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO; struct Peer { channel_encryptor: PeerChannelEncryptor, @@ -718,14 +722,17 @@ impl P } } - /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly. + /// Append a message to a peer's pending outbound/write buffer + fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec) { + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); + } + + /// Append a message to a peer's pending outbound/write buffer fn enqueue_message(&self, peer: &mut Peer, message: &M) { let mut buffer = VecWriter(Vec::with_capacity(2048)); wire::write(message, &mut buffer).unwrap(); // crash if the write failed - let encoded_message = buffer.0; - log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); + self.enqueue_encoded_message(peer, &buffer.0); } fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result { @@ -1114,7 +1121,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + self.enqueue_encoded_message(peer, &encoded_msg); } }, wire::Message::NodeAnnouncement(ref msg) => { @@ -1136,7 +1143,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + self.enqueue_encoded_message(peer, &encoded_msg); } }, wire::Message::ChannelUpdate(ref msg) => { @@ -1155,7 +1162,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + self.enqueue_encoded_message(peer, &encoded_msg); } }, _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -1431,6 +1438,23 @@ impl P } } + /// Disconnects all currently-connected peers. This is useful on platforms where there may be + /// an indication that TCP sockets have stalled even if we weren't around to time them out + /// using regular ping/pongs. + pub fn disconnect_all_peers(&self) { + let mut peers_lock = self.peers.lock().unwrap(); + let peers = &mut *peers_lock; + for (mut descriptor, peer) in peers.peers.drain() { + if let Some(node_id) = peer.their_node_id { + log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id); + peers.node_id_to_descriptor.remove(&node_id); + self.message_handler.chan_handler.peer_disconnected(&node_id, false); + } + descriptor.disconnect_socket(); + } + debug_assert!(peers.node_id_to_descriptor.is_empty()); + } + /// Send pings to each peer and disconnect those which did not respond to the last round of /// pings. ///