From: Tibo-lg Date: Wed, 14 Jul 2021 02:44:36 +0000 (+0900) Subject: Add unknown message handler to peer manager X-Git-Url: http://git.bitcoin.ninja/?a=commitdiff_plain;h=refs%2Fheads%2F2021-07-custom-msg-suggestion;p=rust-lightning Add unknown message handler to peer manager --- diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index f68cc8f3d..b57869871 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -34,7 +34,7 @@ use lightning::chain::transaction::OutPoint; use lightning::chain::keysinterface::{InMemorySigner, KeysInterface}; use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning::ln::channelmanager::{ChainParameters, ChannelManager}; -use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor}; +use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor,IgnoringUnknownMessageHandler}; use lightning::ln::msgs::DecodeError; use lightning::routing::router::get_route; use lightning::routing::network_graph::NetGraphMsgHandler; @@ -159,7 +159,7 @@ type ChannelMan = ChannelManager< EnforcingSigner, Arc, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>; -type PeerMan<'a> = PeerManager, Arc, Arc, Arc>>, Arc>; +type PeerMan<'a> = PeerManager, Arc, Arc, Arc>>, Arc, Arc>; struct MoneyLossDetector<'a> { manager: Arc, @@ -374,7 +374,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler { chan_handler: channelmanager.clone(), route_handler: net_graph_msg_handler.clone(), - }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger))); + }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), Arc::new(IgnoringUnknownMessageHandler{}))); let mut should_forward = false; let mut payments_received: Vec = Vec::new(); diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 3a36bb562..93d259a51 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -15,7 +15,7 @@ use lightning::chain::channelmonitor; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; -use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; +use lightning::ln::peer_handler::{PeerManager, SocketDescriptor, UnknownMessageHandler}; use lightning::util::events::{EventHandler, EventsProvider}; use lightning::util::logger::Logger; use std::sync::Arc; @@ -118,7 +118,8 @@ impl BackgroundProcessor { CMP: 'static + Send + ChannelManagerPersister, M: 'static + Deref> + Send + Sync, CM: 'static + Deref> + Send + Sync, - PM: 'static + Deref> + Send + Sync, + UMH: 'static + Deref + Send + Sync, + PM: 'static + Deref> + Send + Sync, > (persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self where @@ -131,6 +132,7 @@ impl BackgroundProcessor { P::Target: 'static + channelmonitor::Persist, CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, + UMH::Target: 'static + UnknownMessageHandler, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -182,7 +184,7 @@ mod tests { use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; use lightning::ln::msgs::ChannelMessageHandler; - use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; + use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringUnknownMessageHandler}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; @@ -210,7 +212,7 @@ mod tests { struct Node { node: Arc>, - peer_manager: Arc, Arc, Arc>>, + peer_manager: Arc, Arc, Arc, Arc>>, chain_monitor: Arc, persister: Arc, tx_broadcaster: Arc, @@ -251,7 +253,7 @@ mod tests { let params = ChainParameters { network, best_block }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; - let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); + let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), Arc::new(IgnoringUnknownMessageHandler {}))); let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; nodes.push(node); } diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 5f5fece0d..68a25a75e 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -37,7 +37,7 @@ //! type DataPersister = dyn lightning::chain::channelmonitor::Persist + Send + Sync; //! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; //! type ChannelManager = Arc>; -//! type PeerManager = Arc>; +//! type PeerManager = Arc>>; //! //! // Connect to node with pubkey their_node_id at addr: //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { @@ -80,6 +80,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; +use lightning::ln::peer_handler::UnknownMessageHandler; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::util::logger::Logger; @@ -119,10 +120,11 @@ struct Connection { id: u64, } impl Connection { - async fn schedule_read(peer_manager: Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, - L: Logger + 'static + ?Sized { + L: Logger + 'static + ?Sized, + UMH: UnknownMessageHandler + 'static { // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -222,10 +224,11 @@ impl Connection { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: UnknownMessageHandler + 'static + Send + Sync { let (reader, write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -262,10 +265,11 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: UnknownMessageHandler + 'static + Send + Sync { let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -332,10 +336,11 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: UnknownMessageHandler + 'static + Send + Sync { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { Some(setup_outbound(peer_manager, their_node_id, stream)) } else { None } @@ -467,7 +472,7 @@ impl Hash for SocketDescriptor { mod tests { use lightning::ln::features::*; use lightning::ln::msgs::*; - use lightning::ln::peer_handler::{MessageHandler, PeerManager}; + use lightning::ln::peer_handler::{MessageHandler, PeerManager, IgnoringUnknownMessageHandler}; use lightning::util::events::*; use bitcoin::secp256k1::{Secp256k1, SecretKey, PublicKey}; @@ -563,7 +568,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), - }, a_key.clone(), &[1; 32], Arc::new(TestLogger()))); + }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(IgnoringUnknownMessageHandler {}))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1); @@ -577,7 +582,7 @@ mod tests { let b_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), - }, b_key.clone(), &[2; 32], Arc::new(TestLogger()))); + }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(IgnoringUnknownMessageHandler {}))); // We bind on localhost, hoping the environment is properly configured with a local // address. This may not always be the case in containers and the like, so if this test is diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 96ec31c98..5a47f39b5 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -68,6 +68,47 @@ impl Deref for IgnoringMessageHandler { fn deref(&self) -> &Self { self } } +/// A dummy implementation of `UnknownMessageHandler` that does nothing. +pub struct IgnoringUnknownMessageHandler{} +impl MessageSendEventsProvider for IgnoringUnknownMessageHandler { + fn get_and_clear_pending_msg_events(&self) -> Vec { + Vec::new() + } +} + +/// Define a dummy type to satisfy the constraint of UnknownMessageHandle `Message` +/// associated type for implementing it for IgnoringUnknownMessageHandler. +type DummyType = (); +impl Encode for DummyType { + const TYPE: u16 = 0; +} +impl Writeable for DummyType { + fn write(&self, _writer: &mut W) -> Result<(), ::std::io::Error> { + Ok(()) + } +} + +impl UnknownMessageHandler for IgnoringUnknownMessageHandler { + type MessageEnum = (); + type Message = DummyType; + fn read(&self, _message_type: u16, _buffer: &mut R) -> Result, msgs::DecodeError> { + Ok(None) + } + + fn handle_unknown_message(&self, _msg: Self::MessageEnum) -> Result<(), MessageHandlingError> { + // Since we always return `None` in the read the handle method should never be called. + unreachable!(); + } + + fn get_and_clear_pending_msgs(&self) -> Vec<(&PublicKey, Self::Message)> { + Vec::new() + } +} +impl Deref for IgnoringUnknownMessageHandler { + type Target = IgnoringUnknownMessageHandler; + fn deref(&self) -> &Self { self } +} + /// A dummy struct which implements `ChannelMessageHandler` without having any channels. /// You can provide one of these as the route_handler in a MessageHandler. pub struct ErroringMessageHandler { @@ -173,6 +214,23 @@ pub struct MessageHandler where pub route_handler: RM, } +/// Handler for messages external to the LN protocol. +pub trait UnknownMessageHandler where Self::Message : Encode + Writeable + Debug { + /// A type that represents a message that can be sent over the wire + type Message; + /// A type that represents an enumeration of messages that can be handled by the handler. + type MessageEnum; + /// + fn read(&self, msg_type: u16, buffer: &mut R) -> Result, msgs::DecodeError>; + /// Called with the message type that was received and the buffer to be read. If the handler + /// could handle the message, should return `Ok(Some(wire::Message::HandledUnknownMessage(msg_type)))`, + /// otherwise Ok(None). Can also return a `DecodingError` if the buffer contained unexpected data + /// for the given message type. + fn handle_unknown_message(&self, msg: Self::MessageEnum) -> Result<(), MessageHandlingError>; + /// Get messages to be sent to specified peers. + fn get_and_clear_pending_msgs(&self) -> Vec<(&PublicKey, Self::Message)>; +} + /// Provides an object which can be used to send data to and which uniquely identifies a connection /// to a remote host. You will need to be able to generate multiple of these which meet Eq and /// implement Hash to meet the PeerManager API. @@ -311,7 +369,7 @@ fn _check_usize_is_32_or_64() { /// lifetimes). Other times you can afford a reference, which is more efficient, in which case /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents /// issues such as overly long function definitions. -pub type SimpleArcPeerManager = PeerManager>, Arc, Arc>>, Arc>; +pub type SimpleArcPeerManager = PeerManager>, Arc, Arc>>, Arc, UMH>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -319,7 +377,7 @@ pub type SimpleArcPeerManager = PeerManager = PeerManager, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L>; +pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L, UMH> = PeerManager, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L, UMH>; /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls /// socket events into messages which it passes on to its [`MessageHandler`]. @@ -340,14 +398,16 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L> = P /// you're using lightning-net-tokio. /// /// [`read_event`]: PeerManager::read_event -pub struct PeerManager where +pub struct PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, - L::Target: Logger { + L::Target: Logger, + UMH::Target: UnknownMessageHandler { message_handler: MessageHandler, peers: Mutex>, our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, + unknown_message_handler: UMH, // Usize needs to be at least 32 bits to avoid overflowing both low and high. If usize is 64 // bits we will never realistically count into high: @@ -357,8 +417,11 @@ pub struct PeerManager PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, L::Target: Logger { /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message @@ -396,11 +459,11 @@ impl PeerManager PeerManager where +impl PeerManager where RM::Target: RoutingMessageHandler, L::Target: Logger { /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message @@ -416,18 +479,19 @@ impl PeerManager PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, - L::Target: Logger { + L::Target: Logger, + UMH::Target: UnknownMessageHandler { /// Constructs a new PeerManager with the given message handlers and node_id secret key /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. - pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { + pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, unknown_message_handler: UMH) -> Self { let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); @@ -442,6 +506,7 @@ impl PeerManager PeerManager x, - Err(e) => { - match e { - msgs::DecodeError::UnknownVersion => return Err(PeerHandleError { no_connection_possible: false }), - msgs::DecodeError::UnknownRequiredFeature => { - log_trace!(self.logger, "Got a channel/node announcement with an known required feature flag, you may want to update!"); - continue; - } - msgs::DecodeError::InvalidValue => { - log_debug!(self.logger, "Got an invalid value while deserializing message"); - return Err(PeerHandleError { no_connection_possible: false }); - } - msgs::DecodeError::ShortRead => { - log_debug!(self.logger, "Deserialization failed due to shortness of message"); - return Err(PeerHandleError { no_connection_possible: false }); - } - msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }), - msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }), - msgs::DecodeError::UnsupportedCompression => { - log_trace!(self.logger, "We don't support zlib-compressed message fields, ignoring message"); - continue; + let mut message_result = wire::read(&mut ::std::io::Cursor::new(&msg_data[..])); + + // Need an Init as first message + if let Ok(wire::Message::Init(_)) = message_result { + } else if peer.their_features.is_none() { + log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); + return Err(PeerHandleError{ no_connection_possible: false }.into()); + } + + let mut message_err = Ok(()); + let mut handle_err = Ok(()); + match message_result { + Ok(wire::Message::Unknown(msg_type)) => { + let mut type_bytes = [0; 2]; + let mut reader = ::std::io::Cursor::new(&msg_data[..]); + reader.read_exact(&mut type_bytes).expect("How did we read these to begin with?"); + match self.unknown_message_handler.read(*msg_type, &mut reader) { + Ok(Some(msg)) => { + handle_err = self.unknown_message_handler.handle_unknown_message(msg); + }, + Ok(None) => { + if *msg_type % 2 == 0 { + return Err(PeerHandleError { no_connection_possible: true }); + } } + Err(e) => { message_err = Err(e); }, } - } - }; - - match self.handle_message(peer, message) { - Err(handling_error) => match handling_error { - MessageHandlingError::PeerHandleError(e) => { return Err(e) }, - MessageHandlingError::LightningError(e) => { - try_potential_handleerror!(Err(e)); - }, }, - Ok(Some(msg)) => { - peer_node_id = Some(peer.their_node_id.expect("After noise is complete, their_node_id is always set")); - msgs_to_forward.push(msg); + Ok(msg) => { + match self.handle_message(peer, msg) { + Ok(Some(forward_msg)) => { + peer_node_id = Some(peer.their_node_id.expect("After noise is complete, their_node_id is always set")); + msgs_to_forward.push(forward_msg); + }, + Ok(None) => {}, + Err(e) => { handle_err = Err(e); }, + } }, - Ok(None) => {}, + Err(e) => { message_err = Err(e); }, + } + + match message_err { + Ok(()) => {}, + Err(e) => match e { + msgs::DecodeError::UnknownVersion => return Err(PeerHandleError { no_connection_possible: false }), + msgs::DecodeError::UnknownRequiredFeature => { + log_trace!(self.logger, "Got a channel/node announcement with an known required feature flag, you may want to update!"); + continue; + } + msgs::DecodeError::InvalidValue => { + log_debug!(self.logger, "Got an invalid value while deserializing message"); + return Err(PeerHandleError { no_connection_possible: false }); + } + msgs::DecodeError::ShortRead => { + log_debug!(self.logger, "Deserialization failed due to shortness of message"); + return Err(PeerHandleError { no_connection_possible: false }); + } + msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }), + msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }), + msgs::DecodeError::UnsupportedCompression => { + log_trace!(self.logger, "We don't support zlib-compressed message fields, ignoring message"); + continue; + } + }, + } + + match handle_err { + Ok(()) => {}, + Err(e) => + match e { + MessageHandlingError::PeerHandleError(e) => { return Err(e) }, + MessageHandlingError::LightningError(e) => { + try_potential_handleerror!(Err(e)); + } + } } } } @@ -868,13 +967,6 @@ impl PeerManager Result, MessageHandlingError> { log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap())); - // Need an Init as first message - if let wire::Message::Init(_) = message { - } else if peer.their_features.is_none() { - log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); - return Err(PeerHandleError{ no_connection_possible: false }.into()); - } - let mut should_forward = None; match message { @@ -1033,14 +1125,9 @@ impl PeerManager { - log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type); - // Fail the channel if message is an even, unknown type as per BOLT #1. - return Err(PeerHandleError{ no_connection_possible: true }.into()); + wire::Message::Unknown(_msg) => { + // Handled in `do_read_event`. }, - wire::Message::Unknown(msg_type) => { - log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type); - } }; Ok(should_forward) } @@ -1135,27 +1222,29 @@ impl PeerManager { - { - match peers.node_id_to_descriptor.get($node_id) { - Some(descriptor) => match peers.peers.get_mut(&descriptor) { - Some(peer) => { - if peer.their_features.is_none() { - continue; - } - peer - }, - None => panic!("Inconsistent peers set state!"), - }, - None => { - continue; + + macro_rules! get_peer_for_forwarding { + ($node_id: expr) => { + { + match peers.node_id_to_descriptor.get($node_id) { + Some(descriptor) => match peers.peers.get_mut(&descriptor) { + Some(peer) => { + if peer.their_features.is_none() { + continue; + } + peer }, - } + None => panic!("Inconsistent peers set state!"), + }, + None => { + continue; + }, } } } + } + + for event in events_generated.drain(..) { match event { MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", @@ -1322,10 +1411,14 @@ impl PeerManager PeerManager(peer_count: usize, cfgs: &'a Vec) -> Vec> { + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { let mut peers = Vec::new(); for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); let ephemeral_bytes = [i as u8; 32]; let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler }; - let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger); + let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger, IgnoringUnknownMessageHandler {}); peers.push(peer); } peers } - fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let secp_ctx = Secp256k1::new(); let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index 0ee280b50..7a495d98f 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -62,6 +62,13 @@ pub enum Message { #[derive(Clone, Copy, Debug)] pub struct MessageType(u16); +impl ::std::ops::Deref for MessageType { + type Target = u16; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + impl Message { #[allow(dead_code)] // This method is only used in tests /// Returns the type that was used to decode the message payload. diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index 0fc7c6b3a..122b2efc1 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -433,7 +433,7 @@ pub enum MessageSendEvent { node_id: PublicKey, /// The reply_channel_range which should be sent. msg: msgs::ReplyChannelRange, - } + }, } /// A trait indicating an object may generate message send events