X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=9c27a23467ce7a8e7134225f352fc961a3998ace;hb=113b0f1a0eb503181ed5d5b4d07f0ef83ab30770;hp=19f505b2c0d3c95f26c49c5e0d3c0fa925ad1ca2;hpb=712d97d3fe313ab5d94b191528c72d005a19d80b;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 19f505b2..9c27a234 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -36,9 +36,10 @@ use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::{Level, Logger, WithContext}; use crate::util::string::PrintableString; +#[allow(unused_imports)] use crate::prelude::*; + use crate::io; -use alloc::collections::VecDeque; use crate::sync::{Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; @@ -116,7 +117,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { - InitFeatures::empty() + let mut features = InitFeatures::empty(); + features.set_gossip_queries_optional(); + features } fn processing_queue_high(&self) -> bool { false } } @@ -1589,15 +1592,37 @@ impl, - mut peer_lock: MutexGuard, - message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> - ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { + peer_lock: MutexGuard, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; let logger = WithContext::from(&self.logger, Some(their_node_id), None); + + let message = match self.do_handle_message_holding_peer_lock(peer_lock, message, &their_node_id, &logger)? { + Some(processed_message) => processed_message, + None => return Ok(None), + }; + + self.do_handle_message_without_peer_lock(peer_mutex, message, &their_node_id, &logger) + } + + // Conducts all message processing that requires us to hold the `peer_lock`. + // + // Returns `None` if the message was fully processed and otherwise returns the message back to + // allow it to be subsequently processed by `do_handle_message_without_peer_lock`. + fn do_handle_message_holding_peer_lock<'a>( + &self, + mut peer_lock: MutexGuard, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, + their_node_id: &PublicKey, + logger: &WithContext<'a, L> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> + { peer_lock.received_message_since_timer_tick = true; // Need an Init as first message @@ -1678,8 +1703,20 @@ impl( + &self, + peer_mutex: &Mutex, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, + their_node_id: &PublicKey, + logger: &WithContext<'a, L> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> + { if is_gossip_msg(message.type_id()) { log_gossip!(logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id)); } else { @@ -1881,7 +1918,7 @@ impl>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + fn forward_broadcast_msg(&self, peers: &HashMap>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); @@ -2273,7 +2310,7 @@ impl::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); + let msg = msg.map(|msg| wire::Message::<<::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); peers_to_disconnect.insert(node_id, msg); }, msgs::ErrorAction::DisconnectPeerWithWarning { msg } => { @@ -2646,11 +2683,13 @@ mod tests { use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{PublicKey, SecretKey}; - use crate::prelude::*; use crate::sync::{Arc, Mutex}; use core::convert::Infallible; use core::sync::atomic::{AtomicBool, Ordering}; + #[allow(unused_imports)] + use crate::prelude::*; + #[derive(Clone)] struct FileDescriptor { fd: u16,