+// This file is Copyright its original authors, visible in version control
+// history.
+//
+// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
+// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// You may not use this file except in accordance with one or both of these
+// licenses.
+
//! Top level peer message handling and socket handling logic lives here.
//!
//! Instead of actually servicing sockets ourselves we require that you implement the
pending_outbound_buffer_first_msg_offset: 0,
awaiting_write_event: false,
- pending_read_buffer: pending_read_buffer,
+ pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
pending_outbound_buffer_first_msg_offset: 0,
awaiting_write_event: false,
- pending_read_buffer: pending_read_buffer,
+ pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
peer.their_node_id = Some(their_node_id);
insert_node_id!();
- let mut features = InitFeatures::known();
+ let mut features = InitFeatures::known().clear_gossip_queries();
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
features.clear_initial_routing_sync();
}
}
log_info!(
- self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unknown flags (local and global): {}",
+ self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, gossip_queries: {}, static_remote_key: {}, unknown flags (local and global): {}",
if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
+ if msg.features.supports_gossip_queries() { "supported" } else { "not supported" },
if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
if msg.features.supports_unknown_bits() { "present" } else { "none" }
);
}
if !peer.outbound {
- let mut features = InitFeatures::known();
+ let mut features = InitFeatures::known().clear_gossip_queries();
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
features.clear_initial_routing_sync();
}
// TODO: forward msg along to all our other peers!
}
},
+ wire::Message::QueryShortChannelIds(msg) => {
+ self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), &msg)?;
+ },
+ wire::Message::ReplyShortChannelIdsEnd(msg) => {
+ self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), &msg)?;
+ },
+ wire::Message::QueryChannelRange(msg) => {
+ self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), &msg)?;
+ },
+ wire::Message::ReplyChannelRange(msg) => {
+ self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), &msg)?;
+ },
+ wire::Message::GossipTimestampFilter(_msg) => {
+ // TODO: handle message
+ },
// Unknown messages:
wire::Message::Unknown(msg_type) if msg_type.is_even() => {
// drop optional-ish messages when send buffers get full!
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+ events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
let mut peers_lock = self.peers.lock().unwrap();
let peers = &mut *peers_lock;
for event in events_generated.drain(..) {
self.do_attempt_write_data(&mut descriptor, peer);
},
}
+ },
+ MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
+ let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+ self.do_attempt_write_data(&mut descriptor, peer);
+ },
+ MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
+ let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+ self.do_attempt_write_data(&mut descriptor, peer);
}
}
}
use bitcoin::secp256k1::Secp256k1;
use bitcoin::secp256k1::key::{SecretKey, PublicKey};
- use rand::{thread_rng, Rng};
-
use std;
use std::sync::{Arc, Mutex};
use std::sync::atomic::Ordering;
fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>> {
let mut peers = Vec::new();
- let mut rng = thread_rng();
- let mut ephemeral_bytes = [0; 32];
- rng.fill_bytes(&mut ephemeral_bytes);
-
for i in 0..peer_count {
- let node_id = {
- let mut key_slice = [0;32];
- rng.fill_bytes(&mut key_slice);
- SecretKey::from_slice(&key_slice).unwrap()
- };
+ let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
+ let ephemeral_bytes = [i as u8; 32];
let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler };
- let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, &cfgs[i].logger);
+ let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger);
peers.push(peer);
}