+// This file is Copyright its original authors, visible in version control
+// history.
+//
+// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
+// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// You may not use this file except in accordance with one or both of these
+// licenses.
+
//! Top level peer message handling and socket handling logic lives here.
//!
//! Instead of actually servicing sockets ourselves we require that you implement the
use ln::features::InitFeatures;
use ln::msgs;
-use ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
+use ln::msgs::{ChannelMessageHandler, LightningError, RoutingMessageHandler};
use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
use util::ser::{VecWriter, Writeable};
use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
logger: L,
}
+enum MessageHandlingError {
+ PeerHandleError(PeerHandleError),
+ LightningError(LightningError),
+}
+
+impl From<PeerHandleError> for MessageHandlingError {
+ fn from(error: PeerHandleError) -> Self {
+ MessageHandlingError::PeerHandleError(error)
+ }
+}
+
+impl From<LightningError> for MessageHandlingError {
+ fn from(error: LightningError) -> Self {
+ MessageHandlingError::LightningError(error)
+ }
+}
+
macro_rules! encode_msg {
($msg: expr) => {{
let mut buffer = VecWriter(Vec::new());
pending_outbound_buffer_first_msg_offset: 0,
awaiting_write_event: false,
- pending_read_buffer: pending_read_buffer,
+ pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
pending_outbound_buffer_first_msg_offset: 0,
awaiting_write_event: false,
- pending_read_buffer: pending_read_buffer,
+ pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
peer.their_node_id = Some(their_node_id);
insert_node_id!();
- let mut features = InitFeatures::known();
+ let mut features = InitFeatures::known().clear_gossip_queries();
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
features.clear_initial_routing_sync();
}
}
};
- log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
-
- // Need an Init as first message
- if let wire::Message::Init(_) = message {
- } else if peer.their_features.is_none() {
- log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
- return Err(PeerHandleError{ no_connection_possible: false });
- }
-
- match message {
- // Setup and Control messages:
- wire::Message::Init(msg) => {
- if msg.features.requires_unknown_bits() {
- log_info!(self.logger, "Peer global features required unknown version bits");
- return Err(PeerHandleError{ no_connection_possible: true });
- }
- if msg.features.requires_unknown_bits() {
- log_info!(self.logger, "Peer local features required unknown version bits");
- return Err(PeerHandleError{ no_connection_possible: true });
- }
- if peer.their_features.is_some() {
- return Err(PeerHandleError{ no_connection_possible: false });
- }
-
- log_info!(self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unkown local flags: {}, unknown global flags: {}",
- if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
- if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
- if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
- if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
- if msg.features.supports_unknown_bits() { "present" } else { "none" },
- if msg.features.supports_unknown_bits() { "present" } else { "none" });
-
- if msg.features.initial_routing_sync() {
- peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
- peers.peers_needing_send.insert(peer_descriptor.clone());
- }
- if !msg.features.supports_static_remote_key() {
- log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap()));
- return Err(PeerHandleError{ no_connection_possible: true });
- }
-
- if !peer.outbound {
- let mut features = InitFeatures::known();
- if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
- features.clear_initial_routing_sync();
- }
-
- let resp = msgs::Init { features };
- self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
- }
-
- self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
- peer.their_features = Some(msg.features);
- },
- wire::Message::Error(msg) => {
- let mut data_is_printable = true;
- for b in msg.data.bytes() {
- if b < 32 || b > 126 {
- data_is_printable = false;
- break;
- }
- }
-
- if data_is_printable {
- log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data);
- } else {
- log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap()));
- }
- self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg);
- if msg.channel_id == [0; 32] {
- return Err(PeerHandleError{ no_connection_possible: true });
- }
- },
-
- wire::Message::Ping(msg) => {
- if msg.ponglen < 65532 {
- let resp = msgs::Pong { byteslen: msg.ponglen };
- self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
- }
- },
- wire::Message::Pong(_msg) => {
- peer.awaiting_pong = false;
- },
-
- // Channel messages:
- wire::Message::OpenChannel(msg) => {
- self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
- },
- wire::Message::AcceptChannel(msg) => {
- self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
- },
-
- wire::Message::FundingCreated(msg) => {
- self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::FundingSigned(msg) => {
- self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::FundingLocked(msg) => {
- self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg);
- },
-
- wire::Message::Shutdown(msg) => {
- self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::ClosingSigned(msg) => {
- self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
- },
-
- // Commitment messages:
- wire::Message::UpdateAddHTLC(msg) => {
- self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::UpdateFulfillHTLC(msg) => {
- self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::UpdateFailHTLC(msg) => {
- self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::UpdateFailMalformedHTLC(msg) => {
- self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg);
- },
-
- wire::Message::CommitmentSigned(msg) => {
- self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::RevokeAndACK(msg) => {
- self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::UpdateFee(msg) => {
- self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::ChannelReestablish(msg) => {
- self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg);
- },
-
- // Routing messages:
- wire::Message::AnnouncementSignatures(msg) => {
- self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::ChannelAnnouncement(msg) => {
- let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_announcement(&msg));
-
- if should_forward {
- // TODO: forward msg along to all our other peers!
- }
- },
- wire::Message::NodeAnnouncement(msg) => {
- let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_node_announcement(&msg));
-
- if should_forward {
- // TODO: forward msg along to all our other peers!
- }
- },
- wire::Message::ChannelUpdate(msg) => {
- let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_update(&msg));
-
- if should_forward {
- // TODO: forward msg along to all our other peers!
- }
- },
-
- // Unknown messages:
- wire::Message::Unknown(msg_type) if msg_type.is_even() => {
- log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type);
- // Fail the channel if message is an even, unknown type as per BOLT #1.
- return Err(PeerHandleError{ no_connection_possible: true });
- },
- wire::Message::Unknown(msg_type) => {
- log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type);
- },
+ if let Err(handling_error) = self.handle_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), message){
+ match handling_error {
+ MessageHandlingError::PeerHandleError(e) => { return Err(e) },
+ MessageHandlingError::LightningError(e) => {
+ try_potential_handleerror!(Err(e));
+ },
+ }
}
}
}
Ok(pause_read)
}
+ /// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
+ fn handle_message(&self, peers_needing_send: &mut HashSet<Descriptor>, peer: &mut Peer, peer_descriptor: Descriptor, message: wire::Message) -> Result<(), MessageHandlingError> {
+ log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
+
+ // Need an Init as first message
+ if let wire::Message::Init(_) = message {
+ } else if peer.their_features.is_none() {
+ log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
+ return Err(PeerHandleError{ no_connection_possible: false }.into());
+ }
+
+ match message {
+ // Setup and Control messages:
+ wire::Message::Init(msg) => {
+ if msg.features.requires_unknown_bits() {
+ log_info!(self.logger, "Peer global features required unknown version bits");
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+ if msg.features.requires_unknown_bits() {
+ log_info!(self.logger, "Peer local features required unknown version bits");
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+ if peer.their_features.is_some() {
+ return Err(PeerHandleError{ no_connection_possible: false }.into());
+ }
+
+ log_info!(
+ self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, gossip_queries: {}, static_remote_key: {}, unknown flags (local and global): {}",
+ if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
+ if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
+ if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
+ if msg.features.supports_gossip_queries() { "supported" } else { "not supported" },
+ if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
+ if msg.features.supports_unknown_bits() { "present" } else { "none" }
+ );
+
+ if msg.features.initial_routing_sync() {
+ peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
+ peers_needing_send.insert(peer_descriptor.clone());
+ }
+ if !msg.features.supports_static_remote_key() {
+ log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap()));
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+
+ if !peer.outbound {
+ let mut features = InitFeatures::known().clear_gossip_queries();
+ if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
+ features.clear_initial_routing_sync();
+ }
+
+ let resp = msgs::Init { features };
+ self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
+ }
+
+ self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
+ peer.their_features = Some(msg.features);
+ },
+ wire::Message::Error(msg) => {
+ let mut data_is_printable = true;
+ for b in msg.data.bytes() {
+ if b < 32 || b > 126 {
+ data_is_printable = false;
+ break;
+ }
+ }
+
+ if data_is_printable {
+ log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data);
+ } else {
+ log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap()));
+ }
+ self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg);
+ if msg.channel_id == [0; 32] {
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+ },
+
+ wire::Message::Ping(msg) => {
+ if msg.ponglen < 65532 {
+ let resp = msgs::Pong { byteslen: msg.ponglen };
+ self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
+ }
+ },
+ wire::Message::Pong(_msg) => {
+ peer.awaiting_pong = false;
+ },
+
+ // Channel messages:
+ wire::Message::OpenChannel(msg) => {
+ self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
+ },
+ wire::Message::AcceptChannel(msg) => {
+ self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
+ },
+
+ wire::Message::FundingCreated(msg) => {
+ self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::FundingSigned(msg) => {
+ self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::FundingLocked(msg) => {
+ self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ wire::Message::Shutdown(msg) => {
+ self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::ClosingSigned(msg) => {
+ self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ // Commitment messages:
+ wire::Message::UpdateAddHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFulfillHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFailHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFailMalformedHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ wire::Message::CommitmentSigned(msg) => {
+ self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::RevokeAndACK(msg) => {
+ self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFee(msg) => {
+ self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::ChannelReestablish(msg) => {
+ self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ // Routing messages:
+ wire::Message::AnnouncementSignatures(msg) => {
+ self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::ChannelAnnouncement(msg) => {
+ let should_forward = match self.message_handler.route_handler.handle_channel_announcement(&msg) {
+ Ok(v) => v,
+ Err(e) => { return Err(e.into()); },
+ };
+
+ if should_forward {
+ // TODO: forward msg along to all our other peers!
+ }
+ },
+ wire::Message::NodeAnnouncement(msg) => {
+ let should_forward = match self.message_handler.route_handler.handle_node_announcement(&msg) {
+ Ok(v) => v,
+ Err(e) => { return Err(e.into()); },
+ };
+
+ if should_forward {
+ // TODO: forward msg along to all our other peers!
+ }
+ },
+ wire::Message::ChannelUpdate(msg) => {
+ let should_forward = match self.message_handler.route_handler.handle_channel_update(&msg) {
+ Ok(v) => v,
+ Err(e) => { return Err(e.into()); },
+ };
+
+ if should_forward {
+ // TODO: forward msg along to all our other peers!
+ }
+ },
+ wire::Message::QueryShortChannelIds(msg) => {
+ self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), &msg)?;
+ },
+ wire::Message::ReplyShortChannelIdsEnd(msg) => {
+ self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), &msg)?;
+ },
+ wire::Message::QueryChannelRange(msg) => {
+ self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), &msg)?;
+ },
+ wire::Message::ReplyChannelRange(msg) => {
+ self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), &msg)?;
+ },
+ wire::Message::GossipTimestampFilter(_msg) => {
+ // TODO: handle message
+ },
+
+ // Unknown messages:
+ wire::Message::Unknown(msg_type) if msg_type.is_even() => {
+ log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type);
+ // Fail the channel if message is an even, unknown type as per BOLT #1.
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ },
+ wire::Message::Unknown(msg_type) => {
+ log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type);
+ }
+ };
+ Ok(())
+ }
+
/// Checks for any events generated by our handlers and processes them. Includes sending most
/// response messages as well as messages generated by calls to handler functions directly (eg
/// functions like ChannelManager::process_pending_htlc_forward or send_payment).
// drop optional-ish messages when send buffers get full!
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+ events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
let mut peers_lock = self.peers.lock().unwrap();
let peers = &mut *peers_lock;
for event in events_generated.drain(..) {
self.do_attempt_write_data(&mut descriptor, peer);
},
}
+ },
+ MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
+ let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+ self.do_attempt_write_data(&mut descriptor, peer);
+ },
+ MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
+ let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+ self.do_attempt_write_data(&mut descriptor, peer);
}
}
}
use bitcoin::secp256k1::Secp256k1;
use bitcoin::secp256k1::key::{SecretKey, PublicKey};
- use rand::{thread_rng, Rng};
-
use std;
use std::sync::{Arc, Mutex};
use std::sync::atomic::Ordering;
fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>> {
let mut peers = Vec::new();
- let mut rng = thread_rng();
- let mut ephemeral_bytes = [0; 32];
- rng.fill_bytes(&mut ephemeral_bytes);
-
for i in 0..peer_count {
- let node_id = {
- let mut key_slice = [0;32];
- rng.fill_bytes(&mut key_slice);
- SecretKey::from_slice(&key_slice).unwrap()
- };
+ let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
+ let ephemeral_bytes = [i as u8; 32];
let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler };
- let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, &cfgs[i].logger);
+ let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger);
peers.push(peer);
}