#[derive(Clone)]
struct FileDescriptor {
fd: u16,
+ hang_writes: Arc<AtomicBool>,
outbound_data: Arc<Mutex<Vec<u8>>>,
disconnect: Arc<AtomicBool>,
}
impl SocketDescriptor for FileDescriptor {
fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize {
- self.outbound_data.lock().unwrap().extend_from_slice(data);
- data.len()
+ if self.hang_writes.load(Ordering::Acquire) {
+ 0
+ } else {
+ self.outbound_data.lock().unwrap().extend_from_slice(data);
+ data.len()
+ }
}
fn disconnect_socket(&mut self) { self.disconnect.store(true, Ordering::Release); }
}
+ impl FileDescriptor {
+ fn new(fd: u16) -> Self {
+ Self {
+ fd,
+ hang_writes: Arc::new(AtomicBool::new(false)),
+ outbound_data: Arc::new(Mutex::new(Vec::new())),
+ disconnect: Arc::new(AtomicBool::new(false)),
+ }
+ }
+ }
+
struct PeerManagerCfg {
chan_handler: test_utils::TestChannelMessageHandler,
routing_handler: test_utils::TestRoutingMessageHandler,
cfgs.push(
PeerManagerCfg{
chan_handler: test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)),
- logger: test_utils::TestLogger::new(),
+ logger: test_utils::TestLogger::with_id(i.to_string()),
routing_handler: test_utils::TestRoutingMessageHandler::new(),
custom_handler: TestCustomMessageHandler { features },
node_signer: test_utils::TestNodeSigner::new(node_secret),
let fd = FD_COUNTER.fetch_add(1, Ordering::Relaxed) as u16;
let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
- let mut fd_a = FileDescriptor {
- fd, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_a = FileDescriptor::new(fd);
let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000};
let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap();
let features_a = peer_a.init_features(id_b);
let features_b = peer_b.init_features(id_a);
- let mut fd_b = FileDescriptor {
- fd, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_b = FileDescriptor::new(fd);
let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001};
let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
let mut ctr = 0;
while start_time.elapsed() < std::time::Duration::from_secs(1) {
let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
- let mut fd_a = FileDescriptor {
- fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_a = FileDescriptor::new($id + ctr * 3);
let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000};
- let mut fd_b = FileDescriptor {
- fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_b = FileDescriptor::new($id + ctr * 3);
let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001};
let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])];
for (peer_a, peer_b) in peer_pairs.iter() {
let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
- let mut fd_a = FileDescriptor {
- fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_a = FileDescriptor::new(1);
let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000};
- let mut fd_b = FileDescriptor {
- fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_b = FileDescriptor::new(1);
let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001};
let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])];
for (peer_a, peer_b) in peer_pairs.iter() {
let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
- let mut fd_a = FileDescriptor {
- fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_a = FileDescriptor::new(1);
let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000};
- let mut fd_b = FileDescriptor {
- fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_b = FileDescriptor::new(1);
let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001};
let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
let cfgs = create_peermgr_cfgs(2);
let peers = create_network(2, &cfgs);
- let mut fd_dup = FileDescriptor {
- fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_dup = FileDescriptor::new(3);
let addr_dup = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1003};
let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap();
peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap();
let peers = create_network(2, &cfgs);
let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
- let mut fd_a = FileDescriptor {
- fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
- let mut fd_b = FileDescriptor {
- fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_a = FileDescriptor::new(1);
+ let mut fd_b = FileDescriptor::new(1);
let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();
}, 0, &[1; 32], &logger, &node_signer_b);
let a_id = node_signer_a.get_node_id(Recipient::Node).unwrap();
- let mut fd_a = FileDescriptor {
- fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
- let mut fd_b = FileDescriptor {
- fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
- disconnect: Arc::new(AtomicBool::new(false)),
- };
+ let mut fd_a = FileDescriptor::new(1);
+ let mut fd_b = FileDescriptor::new(1);
// Exchange messages with both peers until they both complete the init handshake.
let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap();