use std::collections::{HashMap,hash_map,HashSet,LinkedList};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::{cmp,error,hash,fmt};
+use std::{cmp, error, hash, fmt, mem};
use std::ops::Deref;
use bitcoin::hashes::sha256::Hash as Sha256;
use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
use bitcoin::hashes::{HashEngine, Hash};
+/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
+/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
+struct IgnoringMessageHandler{}
+impl MessageSendEventsProvider for IgnoringMessageHandler {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> { Vec::new() }
+}
+impl RoutingMessageHandler for IgnoringMessageHandler {
+ fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+ fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+ fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
+ fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
+ fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
+ Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
+ fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
+ fn sync_routing_table(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
+ fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
+ fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
+ fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
+ fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
+}
+impl Deref for IgnoringMessageHandler {
+ type Target = IgnoringMessageHandler;
+ fn deref(&self) -> &Self { self }
+}
+
+/// A dummy struct which implements `ChannelMessageHandler` without having any channels.
+/// You can provide one of these as the route_handler in a MessageHandler.
+struct ErroringMessageHandler {
+ message_queue: Mutex<Vec<MessageSendEvent>>
+}
+impl ErroringMessageHandler {
+ /// Constructs a new ErroringMessageHandler
+ pub fn new() -> Self {
+ Self { message_queue: Mutex::new(Vec::new()) }
+ }
+ fn push_error(&self, node_id: &PublicKey, channel_id: [u8; 32]) {
+ self.message_queue.lock().unwrap().push(MessageSendEvent::HandleError {
+ action: msgs::ErrorAction::SendErrorMessage {
+ msg: msgs::ErrorMessage { channel_id, data: "We do not support channel messages, sorry.".to_owned() },
+ },
+ node_id: node_id.clone(),
+ });
+ }
+}
+impl MessageSendEventsProvider for ErroringMessageHandler {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+ let mut res = Vec::new();
+ mem::swap(&mut res, &mut self.message_queue.lock().unwrap());
+ res
+ }
+}
+impl ChannelMessageHandler for ErroringMessageHandler {
+ // Any messages which are related to a specific channel generate an error message to let the
+ // peer know we don't care about channels.
+ fn handle_open_channel(&self, their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::OpenChannel) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
+ }
+ fn handle_accept_channel(&self, their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::AcceptChannel) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
+ }
+ fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
+ }
+ fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_shutdown(&self, their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
+ ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+ }
+ fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
+ fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {}
+ fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
+}
+impl Deref for ErroringMessageHandler {
+ type Target = ErroringMessageHandler;
+ fn deref(&self) -> &Self { self }
+}
+
/// Provides references to trait impls which handle different types of messages.
pub struct MessageHandler<CM: Deref, RM: Deref> where
CM::Target: ChannelMessageHandler,
RM::Target: RoutingMessageHandler {
/// A message handler which handles messages specific to channels. Usually this is just a
- /// ChannelManager object.
+ /// ChannelManager object or a ErroringMessageHandler.
pub chan_handler: CM,
/// A message handler which handles messages updating our knowledge of the network channel
- /// graph. Usually this is just a NetGraphMsgHandlerMonitor object.
+ /// graph. Usually this is just a NetGraphMsgHandlerMonitor object or an IgnoringMessageHandler.
pub route_handler: RM,
}
}
/// Error for PeerManager errors. If you get one of these, you must disconnect the socket and
-/// generate no further read_event/write_buffer_space_avail calls for the descriptor, only
-/// triggering a single socket_disconnected call (unless it was provided in response to a
-/// new_*_connection event, in which case no such socket_disconnected() must be called and the
-/// socket silently disconencted).
+/// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the
+/// descriptor.
+#[derive(Clone)]
pub struct PeerHandleError {
/// Used to indicate that we probably can't make any future connections to this peer, implying
/// we should go ahead and force-close any channels we have with it.
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
/// issues such as overly long function definitions.
-pub type SimpleArcPeerManager<SD, M, T, F, C, L> = Arc<PeerManager<SD, SimpleArcChannelManager<M, T, F, L>, Arc<NetGraphMsgHandler<Arc<C>, Arc<L>>>, Arc<L>>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<NetGraphMsgHandler<Arc<C>, Arc<L>>>, Arc<L>>;
/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
}}
}
+impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor, CM, IgnoringMessageHandler, L> where
+ CM::Target: ChannelMessageHandler,
+ L::Target: Logger {
+ /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message
+ /// handler is used and network graph messages are ignored.
+ ///
+ /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
+ /// cryptographically secure random bytes.
+ ///
+ /// (C-not exported) as we can't export a PeerManager with a dummy route handler
+ pub fn new_channel_only(channel_message_handler: CM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
+ Self::new(MessageHandler {
+ chan_handler: channel_message_handler,
+ route_handler: IgnoringMessageHandler{},
+ }, our_node_secret, ephemeral_random_data, logger)
+ }
+}
+
+impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor, ErroringMessageHandler, RM, L> where
+ RM::Target: RoutingMessageHandler,
+ L::Target: Logger {
+ /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message
+ /// handler is used and messages related to channels will be ignored (or generate error
+ /// messages). Note that some other lightning implementations time-out connections after some
+ /// time if no channel is built with the peer.
+ ///
+ /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
+ /// cryptographically secure random bytes.
+ ///
+ /// (C-not exported) as we can't export a PeerManager with a dummy channel handler
+ pub fn new_routing_only(routing_message_handler: RM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
+ Self::new(MessageHandler {
+ chan_handler: ErroringMessageHandler::new(),
+ route_handler: routing_message_handler,
+ }, our_node_secret, ephemeral_random_data, logger)
+ }
+}
+
/// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds.
/// PeerIds may repeat, but only after socket_disconnected() has been called.
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<Descriptor, CM, RM, L> where
peer.their_node_id = Some(their_node_id);
insert_node_id!();
- let mut features = InitFeatures::known().clear_gossip_queries();
- if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
- features.clear_initial_routing_sync();
- }
-
+ let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
},
// Setup and Control messages:
wire::Message::Init(msg) => {
if msg.features.requires_unknown_bits() {
- log_info!(self.logger, "Peer global features required unknown version bits");
- return Err(PeerHandleError{ no_connection_possible: true }.into());
- }
- if msg.features.requires_unknown_bits() {
- log_info!(self.logger, "Peer local features required unknown version bits");
+ log_info!(self.logger, "Peer features required unknown version bits");
return Err(PeerHandleError{ no_connection_possible: true }.into());
}
if peer.their_features.is_some() {
}
if !peer.outbound {
- let mut features = InitFeatures::known().clear_gossip_queries();
- if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
- features.clear_initial_routing_sync();
- }
-
+ let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
}
+ self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg);
+
self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
peer.their_features = Some(msg.features);
},
},
wire::Message::Shutdown(msg) => {
- self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
+ self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), peer.their_features.as_ref().unwrap(), &msg);
},
wire::Message::ClosingSigned(msg) => {
self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
}
},
wire::Message::QueryShortChannelIds(msg) => {
- self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), &msg)?;
+ self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?;
},
wire::Message::ReplyShortChannelIdsEnd(msg) => {
- self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), &msg)?;
+ self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?;
},
wire::Message::QueryChannelRange(msg) => {
- self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), &msg)?;
+ self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?;
},
wire::Message::ReplyChannelRange(msg) => {
- self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), &msg)?;
+ self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
},
wire::Message::GossipTimestampFilter(_msg) => {
// TODO: handle message
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.do_attempt_write_data(&mut descriptor, peer);
}
+ MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
+ log_trace!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
+ log_pubkey!(node_id),
+ msg.short_channel_ids.len(),
+ msg.first_blocknum,
+ msg.number_of_blocks,
+ msg.sync_complete);
+ let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+ self.do_attempt_write_data(&mut descriptor, peer);
+ }
}
}
};
}
+ /// Disconnect a peer given its node id.
+ ///
+ /// Set no_connection_possible to true to prevent any further connection with this peer,
+ /// force-closing any channels we have with it.
+ ///
+ /// If a peer is connected, this will call `disconnect_socket` on the descriptor for the peer,
+ /// so be careful about reentrancy issues.
+ pub fn disconnect_by_node_id(&self, node_id: PublicKey, no_connection_possible: bool) {
+ let mut peers_lock = self.peers.lock().unwrap();
+ if let Some(mut descriptor) = peers_lock.node_id_to_descriptor.remove(&node_id) {
+ log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id);
+ peers_lock.peers.remove(&descriptor);
+ peers_lock.peers_needing_send.remove(&descriptor);
+ self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
+ descriptor.disconnect_socket();
+ }
+ }
+
/// This function should be called roughly once every 30 seconds.
/// It will send pings to each peer and disconnect those which did not respond to the last round of pings.
(fd_a.clone(), fd_b.clone())
}
- fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
- let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b);
- assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
- assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
- (fd_a.clone(), fd_b.clone())
- }
-
#[test]
fn test_disconnect_peer() {
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
}
-
- #[test]
- fn limit_initial_routing_sync_requests() {
- // Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
- {
- let cfgs = create_peermgr_cfgs(2);
- cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
- let peers = create_network(2, &cfgs);
- let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
-
- let peer_0 = peers[0].peers.lock().unwrap();
- let peer_1 = peers[1].peers.lock().unwrap();
-
- let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
- let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
-
- assert!(peer_0_features.unwrap().initial_routing_sync());
- assert!(!peer_1_features.unwrap().initial_routing_sync());
- }
-
- // Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
- {
- let cfgs = create_peermgr_cfgs(2);
- cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
- let peers = create_network(2, &cfgs);
- let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
-
- let peer_0 = peers[0].peers.lock().unwrap();
- let peer_1 = peers[1].peers.lock().unwrap();
-
- let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
- let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
-
- assert!(!peer_0_features.unwrap().initial_routing_sync());
- assert!(peer_1_features.unwrap().initial_routing_sync());
- }
- }
}