use prelude::*;
use io;
use alloc::collections::LinkedList;
-use alloc::fmt::Debug;
use sync::{Arc, Mutex};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::{cmp, hash, fmt, mem};
use core::ops::Deref;
+use core::convert::Infallible;
#[cfg(feature = "std")] use std::error;
use bitcoin::hashes::sha256::Hash as Sha256;
fn deref(&self) -> &Self { self }
}
-impl wire::Type for () {
+// Implement Type for Infallible, note that it cannot be constructed, and thus you can never call a
+// method that takes self for it.
+impl wire::Type for Infallible {
fn type_id(&self) -> u16 {
- // We should never call this for `DummyCustomType`
unreachable!();
}
}
-
-impl Writeable for () {
+impl Writeable for Infallible {
fn write<W: Writer>(&self, _: &mut W) -> Result<(), io::Error> {
unreachable!();
}
}
impl wire::CustomMessageReader for IgnoringMessageHandler {
- type CustomMessage = ();
+ type CustomMessage = Infallible;
fn read<R: io::Read>(&self, _message_type: u16, _buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
Ok(None)
}
}
impl CustomMessageHandler for IgnoringMessageHandler {
- fn handle_custom_message(&self, _msg: Self::CustomMessage, _sender_node_id: &PublicKey) -> Result<(), LightningError> {
+ fn handle_custom_message(&self, _msg: Infallible, _sender_node_id: &PublicKey) -> Result<(), LightningError> {
// Since we always return `None` in the read the handle method should never be called.
unreachable!();
}
NodesSyncing(PublicKey),
}
+/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
+/// forwarding gossip messages to peers altogether.
+const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
+
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
/// we have fewer than this many messages in the outbound buffer again.
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
/// the peer.
-const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = 20;
+const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
struct Peer {
channel_encryptor: PeerChannelEncryptor,
CM::Target: ChannelMessageHandler,
RM::Target: RoutingMessageHandler,
L::Target: Logger,
- CMH::Target: CustomMessageHandler + wire::CustomMessageReader {
+ CMH::Target: CustomMessageHandler {
/// Constructs a new PeerManager with the given message handlers and node_id secret key
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
}
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
- fn enqueue_message<M: wire::Type + Writeable + Debug>(&self, peer: &mut Peer, message: &M) {
+ fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
let mut buffer = VecWriter(Vec::with_capacity(2048));
wire::write(message, &mut buffer).unwrap(); // crash if the write failed
let encoded_message = buffer.0;
}
}
+ /// Disconnects all currently-connected peers. This is useful on platforms where there may be
+ /// an indication that TCP sockets have stalled even if we weren't around to time them out
+ /// using regular ping/pongs.
+ pub fn disconnect_all_peers(&self) {
+ let mut peers_lock = self.peers.lock().unwrap();
+ let peers = &mut *peers_lock;
+ for (mut descriptor, peer) in peers.peers.drain() {
+ if let Some(node_id) = peer.their_node_id {
+ log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
+ peers.node_id_to_descriptor.remove(&node_id);
+ self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ }
+ descriptor.disconnect_socket();
+ }
+ debug_assert!(peers.node_id_to_descriptor.is_empty());
+ }
+
/// Send pings to each peer and disconnect those which did not respond to the last round of
/// pings.
///