From: Matt Corallo <git@bluematt.me>
Date: Thu, 2 Mar 2023 06:27:08 +0000 (+0000)
Subject: Fuzz rapid peer connection/disconnections in threads
X-Git-Tag: v0.0.114~9^2
X-Git-Url: http://git.bitcoin.ninja/?a=commitdiff_plain;h=48946d9add1ca7f3e930dab1e4b16488e4200625;p=rust-lightning

Fuzz rapid peer connection/disconnections in threads

This test fails on the bug fixed two commits ago with the
additional assertions in the previous commit.
---

diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs
index fc341e964..41bbf5b49 100644
--- a/lightning/src/ln/peer_handler.rs
+++ b/lightning/src/ln/peer_handler.rs
@@ -2204,12 +2204,13 @@ mod tests {
 
 	use crate::prelude::*;
 	use crate::sync::{Arc, Mutex};
-	use core::sync::atomic::Ordering;
+	use core::sync::atomic::{AtomicBool, Ordering};
 
 	#[derive(Clone)]
 	struct FileDescriptor {
 		fd: u16,
 		outbound_data: Arc<Mutex<Vec<u8>>>,
+		disconnect: Arc<AtomicBool>,
 	}
 	impl PartialEq for FileDescriptor {
 		fn eq(&self, other: &Self) -> bool {
@@ -2229,7 +2230,7 @@ mod tests {
 			data.len()
 		}
 
-		fn disconnect_socket(&mut self) {}
+		fn disconnect_socket(&mut self) { self.disconnect.store(true, Ordering::Release); }
 	}
 
 	struct PeerManagerCfg {
@@ -2270,10 +2271,16 @@ mod tests {
 
 	fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>) -> (FileDescriptor, FileDescriptor) {
 		let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
-		let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
+		let mut fd_a = FileDescriptor {
+			fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+			disconnect: Arc::new(AtomicBool::new(false)),
+		};
 		let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000};
 		let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap();
-		let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
+		let mut fd_b = FileDescriptor {
+			fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+			disconnect: Arc::new(AtomicBool::new(false)),
+		};
 		let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001};
 		let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
 		peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
@@ -2297,6 +2304,84 @@ mod tests {
 		(fd_a.clone(), fd_b.clone())
 	}
 
+	#[test]
+	#[cfg(feature = "std")]
+	fn fuzz_threaded_connections() {
+		// Spawn two threads which repeatedly connect two peers together, leading to "got second
+		// connection with peer" disconnections and rapid reconnect. This previously found an issue
+		// with our internal map consistency, and is a generally good smoke test of disconnection.
+		let cfgs = Arc::new(create_peermgr_cfgs(2));
+		// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
+		let peers = Arc::new(create_network(2, unsafe { &*(&*cfgs as *const _) as &'static _ }));
+
+		let start_time = std::time::Instant::now();
+		macro_rules! spawn_thread { ($id: expr) => { {
+			let peers = Arc::clone(&peers);
+			let cfgs = Arc::clone(&cfgs);
+			std::thread::spawn(move || {
+				let mut ctr = 0;
+				while start_time.elapsed() < std::time::Duration::from_secs(1) {
+					let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
+					let mut fd_a = FileDescriptor {
+						fd: $id  + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
+						disconnect: Arc::new(AtomicBool::new(false)),
+					};
+					let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000};
+					let mut fd_b = FileDescriptor {
+						fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
+						disconnect: Arc::new(AtomicBool::new(false)),
+					};
+					let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001};
+					let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
+					peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
+					assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
+
+					while start_time.elapsed() < std::time::Duration::from_secs(1) {
+						peers[0].process_events();
+						if fd_a.disconnect.load(Ordering::Acquire) { break; }
+						let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+						if peers[1].read_event(&mut fd_b, &a_data).is_err() { break; }
+
+						peers[1].process_events();
+						if fd_b.disconnect.load(Ordering::Acquire) { break; }
+						let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+						if peers[0].read_event(&mut fd_a, &b_data).is_err() { break; }
+
+						cfgs[0].chan_handler.pending_events.lock().unwrap()
+							.push(crate::util::events::MessageSendEvent::SendShutdown {
+								node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(),
+								msg: msgs::Shutdown {
+									channel_id: [0; 32],
+									scriptpubkey: bitcoin::Script::new(),
+								},
+							});
+						cfgs[1].chan_handler.pending_events.lock().unwrap()
+							.push(crate::util::events::MessageSendEvent::SendShutdown {
+								node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(),
+								msg: msgs::Shutdown {
+									channel_id: [0; 32],
+									scriptpubkey: bitcoin::Script::new(),
+								},
+							});
+
+						peers[0].timer_tick_occurred();
+						peers[1].timer_tick_occurred();
+					}
+
+					peers[0].socket_disconnected(&fd_a);
+					peers[1].socket_disconnected(&fd_b);
+					ctr += 1;
+					std::thread::sleep(std::time::Duration::from_micros(1));
+				}
+			})
+		} } }
+		let thrd_a = spawn_thread!(1);
+		let thrd_b = spawn_thread!(2);
+
+		thrd_a.join().unwrap();
+		thrd_b.join().unwrap();
+	}
+
 	#[test]
 	fn test_disconnect_peer() {
 		// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@@ -2353,7 +2438,10 @@ mod tests {
 		let cfgs = create_peermgr_cfgs(2);
 		let peers = create_network(2, &cfgs);
 
-		let mut fd_dup = FileDescriptor { fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())) };
+		let mut fd_dup = FileDescriptor {
+			fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
+			disconnect: Arc::new(AtomicBool::new(false)),
+		};
 		let addr_dup = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1003};
 		let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap();
 		peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap();
@@ -2457,8 +2545,14 @@ mod tests {
 		let peers = create_network(2, &cfgs);
 
 		let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
-		let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
-		let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
+		let mut fd_a = FileDescriptor {
+			fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+			disconnect: Arc::new(AtomicBool::new(false)),
+		};
+		let mut fd_b = FileDescriptor {
+			fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+			disconnect: Arc::new(AtomicBool::new(false)),
+		};
 		let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
 		peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();