X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=edc9c68a5068c25b419a5e21dc16a4cb7d850ac9;hb=2d213a47bffe2f3d9b744fe64e0001d90a4bf8fa;hp=995901a3b54f7355bf50d430992b06da904f70c8;hpb=be6f263825e0c75d32d6d48fd5dff9986ca6b011;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 995901a3..edc9c68a 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -46,16 +46,23 @@ use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::sha256::HashEngine as Sha256Engine; use bitcoin::hashes::{HashEngine, Hash}; -/// Handler for BOLT1-compliant messages. +/// A handler provided to [`PeerManager`] for reading and handling custom messages. +/// +/// [BOLT 1] specifies a custom message type range for use with experimental or application-specific +/// messages. `CustomMessageHandler` allows for user-defined handling of such types. See the +/// [`lightning_custom_message`] crate for tools useful in composing more than one custom handler. +/// +/// [BOLT 1]: https://github.com/lightning/bolts/blob/master/01-messaging.md +/// [`lightning_custom_message`]: https://docs.rs/lightning_custom_message/latest/lightning_custom_message pub trait CustomMessageHandler: wire::CustomMessageReader { - /// Called with the message type that was received and the buffer to be read. - /// Can return a `MessageHandlingError` if the message could not be handled. + /// Handles the given message sent from `sender_node_id`, possibly producing messages for + /// [`CustomMessageHandler::get_and_clear_pending_msg`] to return and thus for [`PeerManager`] + /// to send. fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>; - /// Gets the list of pending messages which were generated by the custom message - /// handler, clearing the list in the process. The first tuple element must - /// correspond to the intended recipients node ids. If no connection to one of the - /// specified node does not exist, the message is simply not sent to it. + /// Returns the list of pending messages that were generated by the handler, clearing the list + /// in the process. Each message is paired with the node id of the intended recipient. If no + /// connection to the node exists, then the message is simply not sent. fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>; } @@ -72,7 +79,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option, Option)> { None } fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option { None } - fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) } + fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) } @@ -88,7 +95,7 @@ impl OnionMessageProvider for IgnoringMessageHandler { } impl OnionMessageHandler for IgnoringMessageHandler { fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {} - fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) } + fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn peer_disconnected(&self, _their_node_id: &PublicKey) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { @@ -224,7 +231,7 @@ impl ChannelMessageHandler for ErroringMessageHandler { // msgs::ChannelUpdate does not contain the channel_id field, so we just drop them. fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {} fn peer_disconnected(&self, _their_node_id: &PublicKey) {} - fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) } + fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { @@ -418,6 +425,8 @@ struct Peer { /// `channel_announcement` at all - we set this unconditionally but unset it every time we /// check if we're gossip-processing-backlogged). received_channel_announce_since_backlogged: bool, + + inbound_connection: bool, } impl Peer { @@ -806,33 +815,40 @@ impl { + debug_assert!(false, "PeerManager driver duplicated descriptors!"); + Err(PeerHandleError {}) + }, + hash_map::Entry::Vacant(e) => { + e.insert(Mutex::new(Peer { + channel_encryptor: peer_encryptor, + their_node_id: None, + their_features: None, + their_net_address: remote_network_address, + + pending_outbound_buffer: LinkedList::new(), + pending_outbound_buffer_first_msg_offset: 0, + gossip_broadcast_buffer: LinkedList::new(), + awaiting_write_event: false, + + pending_read_buffer, + pending_read_buffer_pos: 0, + pending_read_is_header: false, + + sync_status: InitSyncTracker::NoSyncRequested, + + msgs_sent_since_pong: 0, + awaiting_pong_timer_tick_intervals: 0, + received_message_since_timer_tick: false, + sent_gossip_timestamp_filter: false, + + received_channel_announce_since_backlogged: false, + inbound_connection: false, + })); + Ok(res) + } + } } /// Indicates a new inbound connection has been established to a node with an optional remote @@ -855,33 +871,40 @@ impl { + debug_assert!(false, "PeerManager driver duplicated descriptors!"); + Err(PeerHandleError {}) + }, + hash_map::Entry::Vacant(e) => { + e.insert(Mutex::new(Peer { + channel_encryptor: peer_encryptor, + their_node_id: None, + their_features: None, + their_net_address: remote_network_address, + + pending_outbound_buffer: LinkedList::new(), + pending_outbound_buffer_first_msg_offset: 0, + gossip_broadcast_buffer: LinkedList::new(), + awaiting_write_event: false, + + pending_read_buffer, + pending_read_buffer_pos: 0, + pending_read_is_header: false, + + sync_status: InitSyncTracker::NoSyncRequested, + + msgs_sent_since_pong: 0, + awaiting_pong_timer_tick_intervals: 0, + received_message_since_timer_tick: false, + sent_gossip_timestamp_filter: false, + + received_channel_announce_since_backlogged: false, + inbound_connection: true, + })); + Ok(()) + } + } } fn peer_should_read(&self, peer: &mut Peer) -> bool { @@ -1130,9 +1153,13 @@ impl { match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) { - hash_map::Entry::Occupied(_) => { + hash_map::Entry::Occupied(e) => { log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event + // Check that the peers map is consistent with the + // node_id_to_descriptor map, as this has been broken + // before. + debug_assert!(peers.get(e.get()).is_some()); return Err(PeerHandleError { }) }, hash_map::Entry::Vacant(entry) => { @@ -1310,15 +1337,15 @@ impl { let peer = peer_lock.lock().unwrap(); - if !peer.handshake_complete() { return; } - debug_assert!(peer.their_node_id.is_some()); if let Some((node_id, _)) = peer.their_node_id { log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id)); - self.node_id_to_descriptor.lock().unwrap().remove(&node_id); + let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); + debug_assert!(removed.is_some(), "descriptor maps should be consistent"); + if !peer.handshake_complete() { return; } self.message_handler.chan_handler.peer_disconnected(&node_id); self.message_handler.onion_message_handler.peer_disconnected(&node_id); } @@ -2177,12 +2204,13 @@ mod tests { use crate::prelude::*; use crate::sync::{Arc, Mutex}; - use core::sync::atomic::Ordering; + use core::sync::atomic::{AtomicBool, Ordering}; #[derive(Clone)] struct FileDescriptor { fd: u16, outbound_data: Arc>>, + disconnect: Arc, } impl PartialEq for FileDescriptor { fn eq(&self, other: &Self) -> bool { @@ -2202,7 +2230,7 @@ mod tests { data.len() } - fn disconnect_socket(&mut self) {} + fn disconnect_socket(&mut self) { self.disconnect.store(true, Ordering::Release); } } struct PeerManagerCfg { @@ -2243,10 +2271,16 @@ mod tests { fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); - let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap(); - let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); @@ -2270,6 +2304,86 @@ mod tests { (fd_a.clone(), fd_b.clone()) } + #[test] + #[cfg(feature = "std")] + fn fuzz_threaded_connections() { + // Spawn two threads which repeatedly connect two peers together, leading to "got second + // connection with peer" disconnections and rapid reconnect. This previously found an issue + // with our internal map consistency, and is a generally good smoke test of disconnection. + let cfgs = Arc::new(create_peermgr_cfgs(2)); + // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }. + let peers = Arc::new(create_network(2, unsafe { &*(&*cfgs as *const _) as &'static _ })); + + let start_time = std::time::Instant::now(); + macro_rules! spawn_thread { ($id: expr) => { { + let peers = Arc::clone(&peers); + let cfgs = Arc::clone(&cfgs); + std::thread::spawn(move || { + let mut ctr = 0; + while start_time.elapsed() < std::time::Duration::from_secs(1) { + let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; + let mut fd_b = FileDescriptor { + fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); + peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); + if peers[0].read_event(&mut fd_a, &initial_data).is_err() { break; } + + while start_time.elapsed() < std::time::Duration::from_secs(1) { + peers[0].process_events(); + if fd_a.disconnect.load(Ordering::Acquire) { break; } + let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); + if peers[1].read_event(&mut fd_b, &a_data).is_err() { break; } + + peers[1].process_events(); + if fd_b.disconnect.load(Ordering::Acquire) { break; } + let b_data = fd_b.outbound_data.lock().unwrap().split_off(0); + if peers[0].read_event(&mut fd_a, &b_data).is_err() { break; } + + cfgs[0].chan_handler.pending_events.lock().unwrap() + .push(crate::util::events::MessageSendEvent::SendShutdown { + node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(), + msg: msgs::Shutdown { + channel_id: [0; 32], + scriptpubkey: bitcoin::Script::new(), + }, + }); + cfgs[1].chan_handler.pending_events.lock().unwrap() + .push(crate::util::events::MessageSendEvent::SendShutdown { + node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(), + msg: msgs::Shutdown { + channel_id: [0; 32], + scriptpubkey: bitcoin::Script::new(), + }, + }); + + if ctr % 2 == 0 { + peers[0].timer_tick_occurred(); + peers[1].timer_tick_occurred(); + } + } + + peers[0].socket_disconnected(&fd_a); + peers[1].socket_disconnected(&fd_b); + ctr += 1; + std::thread::sleep(std::time::Duration::from_micros(1)); + } + }) + } } } + let thrd_a = spawn_thread!(1); + let thrd_b = spawn_thread!(2); + + thrd_a.join().unwrap(); + thrd_b.join().unwrap(); + } + #[test] fn test_disconnect_peer() { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and @@ -2326,7 +2440,10 @@ mod tests { let cfgs = create_peermgr_cfgs(2); let peers = create_network(2, &cfgs); - let mut fd_dup = FileDescriptor { fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())) }; + let mut fd_dup = FileDescriptor { + fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; let addr_dup = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1003}; let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap(); peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap(); @@ -2430,8 +2547,14 @@ mod tests { let peers = create_network(2, &cfgs); let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap(); - let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; - let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();