+ #[test]
+ #[cfg(feature = "std")]
+ fn fuzz_threaded_connections() {
+ // Spawn two threads which repeatedly connect two peers together, leading to "got second
+ // connection with peer" disconnections and rapid reconnect. This previously found an issue
+ // with our internal map consistency, and is a generally good smoke test of disconnection.
+ let cfgs = Arc::new(create_peermgr_cfgs(2));
+ // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
+ let peers = Arc::new(create_network(2, unsafe { &*(&*cfgs as *const _) as &'static _ }));
+
+ let start_time = std::time::Instant::now();
+ macro_rules! spawn_thread { ($id: expr) => { {
+ let peers = Arc::clone(&peers);
+ let cfgs = Arc::clone(&cfgs);
+ std::thread::spawn(move || {
+ let mut ctr = 0;
+ while start_time.elapsed() < std::time::Duration::from_secs(1) {
+ let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
+ let mut fd_a = FileDescriptor {
+ fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
+ disconnect: Arc::new(AtomicBool::new(false)),
+ };
+ let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000};
+ let mut fd_b = FileDescriptor {
+ fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
+ disconnect: Arc::new(AtomicBool::new(false)),
+ };
+ let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001};
+ let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
+ peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
+ if peers[0].read_event(&mut fd_a, &initial_data).is_err() { break; }
+
+ while start_time.elapsed() < std::time::Duration::from_secs(1) {
+ peers[0].process_events();
+ if fd_a.disconnect.load(Ordering::Acquire) { break; }
+ let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ if peers[1].read_event(&mut fd_b, &a_data).is_err() { break; }
+
+ peers[1].process_events();
+ if fd_b.disconnect.load(Ordering::Acquire) { break; }
+ let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+ if peers[0].read_event(&mut fd_a, &b_data).is_err() { break; }
+
+ cfgs[0].chan_handler.pending_events.lock().unwrap()
+ .push(crate::events::MessageSendEvent::SendShutdown {
+ node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(),
+ msg: msgs::Shutdown {
+ channel_id: ChannelId::new_zero(),
+ scriptpubkey: bitcoin::ScriptBuf::new(),
+ },
+ });
+ cfgs[1].chan_handler.pending_events.lock().unwrap()
+ .push(crate::events::MessageSendEvent::SendShutdown {
+ node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(),
+ msg: msgs::Shutdown {
+ channel_id: ChannelId::new_zero(),
+ scriptpubkey: bitcoin::ScriptBuf::new(),
+ },
+ });
+
+ if ctr % 2 == 0 {
+ peers[0].timer_tick_occurred();
+ peers[1].timer_tick_occurred();
+ }
+ }
+
+ peers[0].socket_disconnected(&fd_a);
+ peers[1].socket_disconnected(&fd_b);
+ ctr += 1;
+ std::thread::sleep(std::time::Duration::from_micros(1));
+ }
+ })
+ } } }
+ let thrd_a = spawn_thread!(1);
+ let thrd_b = spawn_thread!(2);
+
+ thrd_a.join().unwrap();
+ thrd_b.join().unwrap();
+ }
+
+ #[test]
+ fn test_feature_incompatible_peers() {
+ let cfgs = create_peermgr_cfgs(2);
+ let incompatible_cfgs = create_feature_incompatible_peermgr_cfgs(2);
+
+ let peers = create_network(2, &cfgs);
+ let incompatible_peers = create_network(2, &incompatible_cfgs);
+ let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])];
+ for (peer_a, peer_b) in peer_pairs.iter() {
+ let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
+ let mut fd_a = FileDescriptor {
+ fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+ disconnect: Arc::new(AtomicBool::new(false)),
+ };
+ let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000};
+ let mut fd_b = FileDescriptor {
+ fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+ disconnect: Arc::new(AtomicBool::new(false)),
+ };
+ let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001};
+ let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
+ peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
+ assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
+ peer_a.process_events();
+
+ let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
+ peer_b.process_events();
+ let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+
+ // Should fail because of unknown required features
+ assert!(peer_a.read_event(&mut fd_a, &b_data).is_err());
+ }
+ }
+
+ #[test]
+ fn test_chain_incompatible_peers() {
+ let cfgs = create_peermgr_cfgs(2);
+ let incompatible_cfgs = create_chain_incompatible_peermgr_cfgs(2);
+
+ let peers = create_network(2, &cfgs);
+ let incompatible_peers = create_network(2, &incompatible_cfgs);
+ let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])];
+ for (peer_a, peer_b) in peer_pairs.iter() {
+ let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
+ let mut fd_a = FileDescriptor {
+ fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+ disconnect: Arc::new(AtomicBool::new(false)),
+ };
+ let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000};
+ let mut fd_b = FileDescriptor {
+ fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+ disconnect: Arc::new(AtomicBool::new(false)),
+ };
+ let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001};
+ let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
+ peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
+ assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
+ peer_a.process_events();
+
+ let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
+ peer_b.process_events();
+ let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+
+ // Should fail because of incompatible chains
+ assert!(peer_a.read_event(&mut fd_a, &b_data).is_err());
+ }
+ }
+