X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=44d08030062fb47b337b879901b6967a73776d58;hb=85c8410f897d21268da7242d420c292a41c1a6ba;hp=669ce7782e9a31a82405f37d3919f5e7bf06db41;hpb=933ae3470309f21ef7537ffbcdc42070d60e1e74;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 669ce778..44d08030 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -8,27 +8,33 @@ use secp256k1::key::{SecretKey,PublicKey}; +use ln::features::InitFeatures; use ln::msgs; -use util::ser::{Writeable, Writer, Readable}; +use ln::msgs::ChannelMessageHandler; +use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; +use util::ser::VecWriter; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; +use ln::wire; +use ln::wire::Encode; use util::byte_utils; -use util::events::{MessageSendEvent}; +use util::events::{MessageSendEvent, MessageSendEventsProvider}; use util::logger::Logger; use std::collections::{HashMap,hash_map,HashSet,LinkedList}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{cmp,error,hash,fmt}; +use std::ops::Deref; use bitcoin_hashes::sha256::Hash as Sha256; use bitcoin_hashes::sha256::HashEngine as Sha256Engine; use bitcoin_hashes::{HashEngine, Hash}; /// Provides references to trait impls which handle different types of messages. -pub struct MessageHandler { +pub struct MessageHandler where CM::Target: msgs::ChannelMessageHandler { /// A message handler which handles messages specific to channels. Usually this is just a /// ChannelManager object. - pub chan_handler: Arc, + pub chan_handler: CM, /// A message handler which handles messages updating our knowledge of the network channel /// graph. Usually this is just a Router object. pub route_handler: Arc, @@ -103,8 +109,7 @@ struct Peer { channel_encryptor: PeerChannelEncryptor, outbound: bool, their_node_id: Option, - their_global_features: Option, - their_local_features: Option, + their_features: Option, pending_outbound_buffer: LinkedList>, pending_outbound_buffer_first_msg_offset: usize, @@ -115,6 +120,8 @@ struct Peer { pending_read_is_header: bool, sync_status: InitSyncTracker, + + awaiting_pong: bool, } impl Peer { @@ -141,20 +148,6 @@ struct PeerHolder { /// Only add to this set when noise completes: node_id_to_descriptor: HashMap, } -struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> { - peers: &'a mut HashMap, - peers_needing_send: &'a mut HashSet, - node_id_to_descriptor: &'a mut HashMap, -} -impl PeerHolder { - fn borrow_parts(&mut self) -> MutPeerHolder { - MutPeerHolder { - peers: &mut self.peers, - peers_needing_send: &mut self.peers_needing_send, - node_id_to_descriptor: &mut self.node_id_to_descriptor, - } - } -} #[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))] fn _check_usize_is_32_or_64() { @@ -162,10 +155,31 @@ fn _check_usize_is_32_or_64() { unsafe { mem::transmute::<*const usize, [u8; 4]>(panic!()); } } +/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. +/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static +/// lifetimes). Other times you can afford a reference, which is more efficient, in which case +/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents +/// issues such as overly long function definitions. +pub type SimpleArcPeerManager = Arc>>; + +/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference +/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't +/// need a PeerManager with a static lifetime. You'll need a static lifetime in cases such as +/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes). +/// But if this is not necessary, using a reference is more efficient. Defining these type aliases +/// helps with issues such as long function definitions. +pub type SimpleRefPeerManager<'a, SD, M> = PeerManager>; + /// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket /// events into messages which it passes on to its MessageHandlers. -pub struct PeerManager { - message_handler: MessageHandler, +/// +/// Rather than using a plain PeerManager, it is preferable to use either a SimpleArcPeerManager +/// a SimpleRefPeerManager, for conciseness. See their documentation for more details, but +/// essentially you should default to using a SimpleRefPeerManager, and use a +/// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when +/// you're using lightning-net-tokio. +pub struct PeerManager where CM::Target: msgs::ChannelMessageHandler { + message_handler: MessageHandler, peers: Mutex>, our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, @@ -175,40 +189,24 @@ pub struct PeerManager { peer_counter_low: AtomicUsize, peer_counter_high: AtomicUsize, - initial_syncs_sent: AtomicUsize, logger: Arc, } -struct VecWriter(Vec); -impl Writer for VecWriter { - fn write_all(&mut self, buf: &[u8]) -> Result<(), ::std::io::Error> { - self.0.extend_from_slice(buf); - Ok(()) - } - fn size_hint(&mut self, size: usize) { - self.0.reserve_exact(size); - } -} - macro_rules! encode_msg { - ($msg: expr, $msg_code: expr) => {{ - let mut msg = VecWriter(Vec::new()); - ($msg_code as u16).write(&mut msg).unwrap(); - $msg.write(&mut msg).unwrap(); - msg.0 + ($msg: expr) => {{ + let mut buffer = VecWriter(Vec::new()); + wire::write($msg, &mut buffer).unwrap(); + buffer.0 }} } -//TODO: Really should do something smarter for this -const INITIAL_SYNCS_TO_SEND: usize = 5; - /// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds. /// PeerIds may repeat, but only after disconnect_event() has been called. -impl PeerManager { +impl PeerManager where CM::Target: msgs::ChannelMessageHandler { /// Constructs a new PeerManager with the given message handlers and node_id secret key /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. - pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: Arc) -> PeerManager { + pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: Arc) -> PeerManager { let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); @@ -223,7 +221,6 @@ impl PeerManager { ephemeral_key_midstate, peer_counter_low: AtomicUsize::new(0), peer_counter_high: AtomicUsize::new(0), - initial_syncs_sent: AtomicUsize::new(0), logger, } } @@ -236,7 +233,7 @@ impl PeerManager { pub fn get_peer_node_ids(&self) -> Vec { let peers = self.peers.lock().unwrap(); peers.peers.values().filter_map(|p| { - if !p.channel_encryptor.is_ready_for_encryption() || p.their_global_features.is_none() { + if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() { return None; } p.their_node_id @@ -274,8 +271,7 @@ impl PeerManager { channel_encryptor: peer_encryptor, outbound: true, their_node_id: None, - their_global_features: None, - their_local_features: None, + their_features: None, pending_outbound_buffer: LinkedList::new(), pending_outbound_buffer_first_msg_offset: 0, @@ -286,6 +282,8 @@ impl PeerManager { pending_read_is_header: false, sync_status: InitSyncTracker::NoSyncRequested, + + awaiting_pong: false, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -310,8 +308,7 @@ impl PeerManager { channel_encryptor: peer_encryptor, outbound: false, their_node_id: None, - their_global_features: None, - their_local_features: None, + their_features: None, pending_outbound_buffer: LinkedList::new(), pending_outbound_buffer_first_msg_offset: 0, @@ -322,6 +319,8 @@ impl PeerManager { pending_read_is_header: false, sync_status: InitSyncTracker::NoSyncRequested, + + awaiting_pong: false, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -330,10 +329,10 @@ impl PeerManager { fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { macro_rules! encode_and_send_msg { - ($msg: expr, $msg_code: expr) => { + ($msg: expr) => { { - log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap())); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..])); + log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap())); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg)[..])); } } } @@ -346,9 +345,9 @@ impl PeerManager { let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8; let all_messages = self.message_handler.route_handler.get_next_channel_announcements(0, steps); for &(ref announce, ref update_a, ref update_b) in all_messages.iter() { - encode_and_send_msg!(announce, 256); - encode_and_send_msg!(update_a, 258); - encode_and_send_msg!(update_b, 258); + encode_and_send_msg!(announce); + encode_and_send_msg!(update_a); + encode_and_send_msg!(update_b); peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1); } if all_messages.is_empty() || all_messages.len() != steps as usize { @@ -359,7 +358,7 @@ impl PeerManager { let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8; let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps); for msg in all_messages.iter() { - encode_and_send_msg!(msg, 256); + encode_and_send_msg!(msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); } if all_messages.is_empty() || all_messages.len() != steps as usize { @@ -371,7 +370,7 @@ impl PeerManager { let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8; let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps); for msg in all_messages.iter() { - encode_and_send_msg!(msg, 256); + encode_and_send_msg!(msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); } if all_messages.is_empty() || all_messages.len() != steps as usize { @@ -447,7 +446,7 @@ impl PeerManager { fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: Vec) -> Result { let pause_read = { let mut peers_lock = self.peers.lock().unwrap(); - let peers = peers_lock.borrow_parts(); + let peers = &mut *peers_lock; let pause_read = match peers.peers.get_mut(peer_descriptor) { None => panic!("Descriptor for read_event is not already known to PeerManager"), Some(peer) => { @@ -467,10 +466,10 @@ impl PeerManager { peer.pending_read_buffer_pos = 0; macro_rules! encode_and_send_msg { - ($msg: expr, $msg_code: expr) => { + ($msg: expr) => { { - log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap())); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..])); + log_trace!(self, "Encoding and sending message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap())); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(&$msg)[..])); peers.peers_needing_send.insert(peer_descriptor.clone()); } } @@ -493,7 +492,7 @@ impl PeerManager { }, msgs::ErrorAction::SendErrorMessage { msg } => { log_trace!(self, "Got Err handling message, sending Error message because {}", e.err); - encode_and_send_msg!(msg, 17); + encode_and_send_msg!(msg); continue; }, } @@ -502,37 +501,6 @@ impl PeerManager { } } - macro_rules! try_potential_decodeerror { - ($thing: expr) => { - match $thing { - Ok(x) => x, - Err(e) => { - match e { - msgs::DecodeError::UnknownVersion => return Err(PeerHandleError{ no_connection_possible: false }), - msgs::DecodeError::UnknownRequiredFeature => { - log_debug!(self, "Got a channel/node announcement with an known required feature flag, you may want to update!"); - continue; - }, - msgs::DecodeError::InvalidValue => { - log_debug!(self, "Got an invalid value while deserializing message"); - return Err(PeerHandleError{ no_connection_possible: false }); - }, - msgs::DecodeError::ShortRead => { - log_debug!(self, "Deserialization failed due to shortness of message"); - return Err(PeerHandleError{ no_connection_possible: false }); - }, - msgs::DecodeError::ExtraAddressesPerType => { - log_debug!(self, "Error decoding message, ignoring due to lnd spec incompatibility. See https://github.com/lightningnetwork/lnd/issues/1407"); - continue; - }, - msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError{ no_connection_possible: false }), - msgs::DecodeError::Io(_) => return Err(PeerHandleError{ no_connection_possible: false }), - } - } - }; - } - } - macro_rules! insert_node_id { () => { match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) { @@ -564,15 +532,13 @@ impl PeerManager { peer.their_node_id = Some(their_node_id); insert_node_id!(); - let mut local_features = msgs::LocalFeatures::new(); - if self.initial_syncs_sent.load(Ordering::Acquire) < INITIAL_SYNCS_TO_SEND { - self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel); - local_features.set_initial_routing_sync(); + let mut features = InitFeatures::supported(); + if self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) { + features.set_initial_routing_sync(); } - encode_and_send_msg!(msgs::Init { - global_features: msgs::GlobalFeatures::new(), - local_features, - }, 16); + + let resp = msgs::Init { features }; + encode_and_send_msg!(resp); }, NextNoiseStep::ActThree => { let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..])); @@ -598,61 +564,85 @@ impl PeerManager { peer.pending_read_buffer = [0; 18].to_vec(); peer.pending_read_is_header = true; - let msg_type = byte_utils::slice_to_be16(&msg_data[0..2]); - log_trace!(self, "Received message of type {} from {}", msg_type, log_pubkey!(peer.their_node_id.unwrap())); - if msg_type != 16 && peer.their_global_features.is_none() { - // Need an init message as first message + let mut reader = ::std::io::Cursor::new(&msg_data[..]); + let message_result = wire::read(&mut reader); + let message = match message_result { + Ok(x) => x, + Err(e) => { + match e { + msgs::DecodeError::UnknownVersion => return Err(PeerHandleError { no_connection_possible: false }), + msgs::DecodeError::UnknownRequiredFeature => { + log_debug!(self, "Got a channel/node announcement with an known required feature flag, you may want to update!"); + continue; + } + msgs::DecodeError::InvalidValue => { + log_debug!(self, "Got an invalid value while deserializing message"); + return Err(PeerHandleError { no_connection_possible: false }); + } + msgs::DecodeError::ShortRead => { + log_debug!(self, "Deserialization failed due to shortness of message"); + return Err(PeerHandleError { no_connection_possible: false }); + } + msgs::DecodeError::ExtraAddressesPerType => { + log_debug!(self, "Error decoding message, ignoring due to lnd spec incompatibility. See https://github.com/lightningnetwork/lnd/issues/1407"); + continue; + } + msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }), + msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }), + } + } + }; + + log_trace!(self, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap())); + + // Need an Init as first message + if let wire::Message::Init(_) = message { + } else if peer.their_features.is_none() { log_trace!(self, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); return Err(PeerHandleError{ no_connection_possible: false }); } - let mut reader = ::std::io::Cursor::new(&msg_data[2..]); - match msg_type { - // Connection control: - 16 => { - let msg = try_potential_decodeerror!(msgs::Init::read(&mut reader)); - if msg.global_features.requires_unknown_bits() { + + match message { + // Setup and Control messages: + wire::Message::Init(msg) => { + if msg.features.requires_unknown_bits() { log_info!(self, "Peer global features required unknown version bits"); return Err(PeerHandleError{ no_connection_possible: true }); } - if msg.local_features.requires_unknown_bits() { + if msg.features.requires_unknown_bits() { log_info!(self, "Peer local features required unknown version bits"); return Err(PeerHandleError{ no_connection_possible: true }); } - if peer.their_global_features.is_some() { + if peer.their_features.is_some() { return Err(PeerHandleError{ no_connection_possible: false }); } log_info!(self, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, unkown local flags: {}, unknown global flags: {}", - if msg.local_features.supports_data_loss_protect() { "supported" } else { "not supported"}, - if msg.local_features.initial_routing_sync() { "requested" } else { "not requested" }, - if msg.local_features.supports_upfront_shutdown_script() { "supported" } else { "not supported"}, - if msg.local_features.supports_unknown_bits() { "present" } else { "none" }, - if msg.global_features.supports_unknown_bits() { "present" } else { "none" }); + if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"}, + if msg.features.initial_routing_sync() { "requested" } else { "not requested" }, + if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"}, + if msg.features.supports_unknown_bits() { "present" } else { "none" }, + if msg.features.supports_unknown_bits() { "present" } else { "none" }); - if msg.local_features.initial_routing_sync() { + if msg.features.initial_routing_sync() { peer.sync_status = InitSyncTracker::ChannelsSyncing(0); peers.peers_needing_send.insert(peer_descriptor.clone()); } - peer.their_global_features = Some(msg.global_features); - peer.their_local_features = Some(msg.local_features); if !peer.outbound { - let mut local_features = msgs::LocalFeatures::new(); - if self.initial_syncs_sent.load(Ordering::Acquire) < INITIAL_SYNCS_TO_SEND { - self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel); - local_features.set_initial_routing_sync(); + let mut features = InitFeatures::supported(); + if self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) { + features.set_initial_routing_sync(); } - encode_and_send_msg!(msgs::Init { - global_features: msgs::GlobalFeatures::new(), - local_features, - }, 16); + let resp = msgs::Init { features }; + encode_and_send_msg!(resp); } - self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()); + self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); + peer.their_features = Some(msg.features); }, - 17 => { - let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader)); + wire::Message::Error(msg) => { let mut data_is_printable = true; for b in msg.data.bytes() { if b < 32 || b > 126 { @@ -672,117 +662,100 @@ impl PeerManager { } }, - 18 => { - let msg = try_potential_decodeerror!(msgs::Ping::read(&mut reader)); + wire::Message::Ping(msg) => { if msg.ponglen < 65532 { let resp = msgs::Pong { byteslen: msg.ponglen }; - encode_and_send_msg!(resp, 19); + encode_and_send_msg!(resp); } }, - 19 => { - try_potential_decodeerror!(msgs::Pong::read(&mut reader)); + wire::Message::Pong(_msg) => { + peer.awaiting_pong = false; }, - // Channel control: - 32 => { - let msg = try_potential_decodeerror!(msgs::OpenChannel::read(&mut reader)); - self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_local_features.clone().unwrap(), &msg); + // Channel messages: + wire::Message::OpenChannel(msg) => { + self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); }, - 33 => { - let msg = try_potential_decodeerror!(msgs::AcceptChannel::read(&mut reader)); - self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_local_features.clone().unwrap(), &msg); + wire::Message::AcceptChannel(msg) => { + self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); }, - 34 => { - let msg = try_potential_decodeerror!(msgs::FundingCreated::read(&mut reader)); + wire::Message::FundingCreated(msg) => { self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg); }, - 35 => { - let msg = try_potential_decodeerror!(msgs::FundingSigned::read(&mut reader)); + wire::Message::FundingSigned(msg) => { self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg); }, - 36 => { - let msg = try_potential_decodeerror!(msgs::FundingLocked::read(&mut reader)); + wire::Message::FundingLocked(msg) => { self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg); }, - 38 => { - let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader)); + wire::Message::Shutdown(msg) => { self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg); }, - 39 => { - let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader)); + wire::Message::ClosingSigned(msg) => { self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg); }, - 128 => { - let msg = try_potential_decodeerror!(msgs::UpdateAddHTLC::read(&mut reader)); + // Commitment messages: + wire::Message::UpdateAddHTLC(msg) => { self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg); }, - 130 => { - let msg = try_potential_decodeerror!(msgs::UpdateFulfillHTLC::read(&mut reader)); + wire::Message::UpdateFulfillHTLC(msg) => { self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg); }, - 131 => { - let msg = try_potential_decodeerror!(msgs::UpdateFailHTLC::read(&mut reader)); + wire::Message::UpdateFailHTLC(msg) => { self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg); }, - 135 => { - let msg = try_potential_decodeerror!(msgs::UpdateFailMalformedHTLC::read(&mut reader)); + wire::Message::UpdateFailMalformedHTLC(msg) => { self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg); }, - 132 => { - let msg = try_potential_decodeerror!(msgs::CommitmentSigned::read(&mut reader)); + wire::Message::CommitmentSigned(msg) => { self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg); }, - 133 => { - let msg = try_potential_decodeerror!(msgs::RevokeAndACK::read(&mut reader)); + wire::Message::RevokeAndACK(msg) => { self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg); }, - 134 => { - let msg = try_potential_decodeerror!(msgs::UpdateFee::read(&mut reader)); + wire::Message::UpdateFee(msg) => { self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg); }, - 136 => { - let msg = try_potential_decodeerror!(msgs::ChannelReestablish::read(&mut reader)); + wire::Message::ChannelReestablish(msg) => { self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg); }, - // Routing control: - 259 => { - let msg = try_potential_decodeerror!(msgs::AnnouncementSignatures::read(&mut reader)); + // Routing messages: + wire::Message::AnnouncementSignatures(msg) => { self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg); }, - 256 => { - let msg = try_potential_decodeerror!(msgs::ChannelAnnouncement::read(&mut reader)); + wire::Message::ChannelAnnouncement(msg) => { let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_announcement(&msg)); if should_forward { // TODO: forward msg along to all our other peers! } }, - 257 => { - let msg = try_potential_decodeerror!(msgs::NodeAnnouncement::read(&mut reader)); + wire::Message::NodeAnnouncement(msg) => { let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_node_announcement(&msg)); if should_forward { // TODO: forward msg along to all our other peers! } }, - 258 => { - let msg = try_potential_decodeerror!(msgs::ChannelUpdate::read(&mut reader)); + wire::Message::ChannelUpdate(msg) => { let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_update(&msg)); if should_forward { // TODO: forward msg along to all our other peers! } }, - _ => { - if (msg_type & 1) == 0 { - return Err(PeerHandleError{ no_connection_possible: true }); - } + + // Unknown messages: + wire::Message::Unknown(msg_type) if msg_type.is_even() => { + // Fail the channel if message is an even, unknown type as per BOLT #1. + return Err(PeerHandleError{ no_connection_possible: true }); }, + wire::Message::Unknown(_) => {}, } } } @@ -813,7 +786,7 @@ impl PeerManager { let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); let mut peers_lock = self.peers.lock().unwrap(); - let peers = peers_lock.borrow_parts(); + let peers = &mut *peers_lock; for event in events_generated.drain(..) { macro_rules! get_peer_for_forwarding { ($node_id: expr, $handle_no_such_peer: block) => { @@ -827,7 +800,7 @@ impl PeerManager { }; match peers.peers.get_mut(&descriptor) { Some(peer) => { - if peer.their_global_features.is_none() { + if peer.their_features.is_none() { $handle_no_such_peer; continue; } @@ -846,7 +819,7 @@ impl PeerManager { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Drop the pending channel? (or just let it timeout, but that sucks) }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { @@ -856,7 +829,7 @@ impl PeerManager { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Drop the pending channel? (or just let it timeout, but that sucks) }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { @@ -868,7 +841,7 @@ impl PeerManager { //TODO: generate a DiscardFunding event indicating to the wallet that //they should just throw away this funding transaction }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { @@ -879,7 +852,7 @@ impl PeerManager { //TODO: generate a DiscardFunding event indicating to the wallet that //they should just throw away this funding transaction }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => { @@ -889,7 +862,7 @@ impl PeerManager { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { @@ -900,7 +873,7 @@ impl PeerManager { //TODO: generate a DiscardFunding event indicating to the wallet that //they should just throw away this funding transaction }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { @@ -914,21 +887,21 @@ impl PeerManager { //TODO: Do whatever we're gonna do for handling dropped messages }); for msg in update_add_htlcs { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 128))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } for msg in update_fulfill_htlcs { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 130))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } for msg in update_fail_htlcs { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 131))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } for msg in update_fail_malformed_htlcs { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 135))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } if let &Some(ref msg) = update_fee { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 134))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { @@ -938,7 +911,7 @@ impl PeerManager { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { @@ -948,7 +921,7 @@ impl PeerManager { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { @@ -958,7 +931,7 @@ impl PeerManager { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { @@ -968,17 +941,17 @@ impl PeerManager { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { - let encoded_msg = encode_msg!(msg, 256); - let encoded_update_msg = encode_msg!(update_msg, 258); + let encoded_msg = encode_msg!(msg); + let encoded_update_msg = encode_msg!(update_msg); for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() || + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel(msg.contents.short_channel_id) { continue } @@ -999,10 +972,10 @@ impl PeerManager { MessageSendEvent::BroadcastChannelUpdate { ref msg } => { log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_update(msg).is_ok() { - let encoded_msg = encode_msg!(msg, 258); + let encoded_msg = encode_msg!(msg); for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() || + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel(msg.contents.short_channel_id) { continue } @@ -1024,7 +997,7 @@ impl PeerManager { log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); // This isn't guaranteed to work, but if there is enough free // room in the send buffer, put the error message there... self.do_attempt_write_data(&mut descriptor, &mut peer); @@ -1044,7 +1017,7 @@ impl PeerManager { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, } @@ -1088,6 +1061,48 @@ impl PeerManager { } }; } + + /// This function should be called roughly once every 30 seconds. + /// It will send pings to each peer and disconnect those which did not respond to the last round of pings. + + /// Will most likely call send_data on all of the registered descriptors, thus, be very careful with reentrancy issues! + pub fn timer_tick_occured(&self) { + let mut peers_lock = self.peers.lock().unwrap(); + { + let peers = &mut *peers_lock; + let peers_needing_send = &mut peers.peers_needing_send; + let node_id_to_descriptor = &mut peers.node_id_to_descriptor; + let peers = &mut peers.peers; + + peers.retain(|descriptor, peer| { + if peer.awaiting_pong == true { + peers_needing_send.remove(descriptor); + match peer.their_node_id { + Some(node_id) => { + node_id_to_descriptor.remove(&node_id); + self.message_handler.chan_handler.peer_disconnected(&node_id, true); + }, + None => {} + } + } + + let ping = msgs::Ping { + ponglen: 0, + byteslen: 64, + }; + peer.pending_outbound_buffer.push_back(encode_msg!(&ping)); + let mut descriptor_clone = descriptor.clone(); + self.do_attempt_write_data(&mut descriptor_clone, peer); + + if peer.awaiting_pong { + false // Drop the peer + } else { + peer.awaiting_pong = true; + true + } + }); + } + } } #[cfg(test)] @@ -1118,22 +1133,31 @@ mod tests { fn disconnect_socket(&mut self) {} } - fn create_network(peer_count: usize) -> Vec> { + fn create_chan_handlers(peer_count: usize) -> Vec { + let mut chan_handlers = Vec::new(); + for _ in 0..peer_count { + let chan_handler = test_utils::TestChannelMessageHandler::new(); + chan_handlers.push(chan_handler); + } + + chan_handlers + } + + fn create_network<'a>(peer_count: usize, chan_handlers: &'a Vec) -> Vec> { let mut peers = Vec::new(); let mut rng = thread_rng(); let logger : Arc = Arc::new(test_utils::TestLogger::new()); let mut ephemeral_bytes = [0; 32]; rng.fill_bytes(&mut ephemeral_bytes); - for _ in 0..peer_count { - let chan_handler = test_utils::TestChannelMessageHandler::new(); + for i in 0..peer_count { let router = test_utils::TestRoutingMessageHandler::new(); let node_id = { let mut key_slice = [0;32]; rng.fill_bytes(&mut key_slice); SecretKey::from_slice(&key_slice).unwrap() }; - let msg_handler = MessageHandler { chan_handler: Arc::new(chan_handler), route_handler: Arc::new(router) }; + let msg_handler = MessageHandler { chan_handler: &chan_handlers[i], route_handler: Arc::new(router) }; let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, Arc::clone(&logger)); peers.push(peer); } @@ -1141,7 +1165,7 @@ mod tests { peers } - fn establish_connection(peer_a: &PeerManager, peer_b: &PeerManager) { + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) { let secp_ctx = Secp256k1::new(); let their_id = PublicKey::from_secret_key(&secp_ctx, &peer_b.our_node_secret); let fd = FileDescriptor { fd: 1}; @@ -1153,22 +1177,39 @@ mod tests { fn test_disconnect_peer() { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and // push a DisconnectPeer event to remove the node flagged by id - let mut peers = create_network(2); + let chan_handlers = create_chan_handlers(2); + let chan_handler = test_utils::TestChannelMessageHandler::new(); + let mut peers = create_network(2, &chan_handlers); establish_connection(&peers[0], &peers[1]); assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); let secp_ctx = Secp256k1::new(); let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); - let chan_handler = test_utils::TestChannelMessageHandler::new(); chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError { node_id: their_id, action: msgs::ErrorAction::DisconnectPeer { msg: None }, }); assert_eq!(chan_handler.pending_events.lock().unwrap().len(), 1); - peers[0].message_handler.chan_handler = Arc::new(chan_handler); + peers[0].message_handler.chan_handler = &chan_handler; peers[0].process_events(); assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); } + #[test] + fn test_timer_tick_occured(){ + // Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer. + let chan_handlers = create_chan_handlers(2); + let peers = create_network(2, &chan_handlers); + establish_connection(&peers[0], &peers[1]); + assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + + // peers[0] awaiting_pong is set to true, but the Peer is still connected + peers[0].timer_tick_occured(); + assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + + // Since timer_tick_occured() is called again when awaiting_pong is true, all Peers are disconnected + peers[0].timer_tick_occured(); + assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); + } }