import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
-import java.util.concurrent.ConcurrentLinkedQueue;
class HumanObjectPeerTestInstance {
class Peer {
class DescriptorHolder { SocketDescriptor val; }
- void do_read_event(ConcurrentLinkedQueue<Thread> list, PeerManager pm, SocketDescriptor descriptor, byte[] data) {
- Thread thread = new Thread(() -> {
- Result_boolPeerHandleErrorZ res = pm.read_event(descriptor, data);
- assert res instanceof Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK;
- //assert ((Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK) res).res;
- });
- thread.start();
- list.add(thread);
+ boolean running = false;
+ final LinkedList<Runnable> runqueue = new LinkedList();
+ Thread t = new Thread(() -> {
+ while (true) {
+ try {
+ Runnable r;
+ synchronized (runqueue) {
+ while (runqueue.isEmpty()) {
+ runqueue.wait();
+ }
+ running = true;
+ r = runqueue.pollFirst();
+ }
+ r.run();
+ synchronized (runqueue) {
+ running = false;
+ runqueue.notifyAll();
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ });
+ void wait_events_processed() {
+ while (true) {
+ synchronized (runqueue) {
+ if (runqueue.isEmpty() && !running) break;
+ try { runqueue.wait(); } catch (InterruptedException e) { assert false; }
+ }
+ }
+ }
+ void do_read_event(PeerManager pm, SocketDescriptor descriptor, byte[] data) {
+ if (!t.isAlive()) t.start();
+ synchronized (runqueue) {
+ runqueue.add(() -> {
+ Result_boolPeerHandleErrorZ res = pm.read_event(descriptor, data);
+ assert res instanceof Result_boolPeerHandleErrorZ.Result_boolPeerHandleErrorZ_OK;
+ });
+ runqueue.notifyAll();
+ }
must_free_objs.add(new WeakReference<>(data));
}
Peer peer1 = new Peer((byte) 1, use_km_wrapper, use_manual_watch);
Peer peer2 = new Peer((byte) 2, use_km_wrapper, use_manual_watch);
- ConcurrentLinkedQueue<Thread> list = new ConcurrentLinkedQueue<Thread>();
DescriptorHolder descriptor1 = new DescriptorHolder();
DescriptorHolder descriptor1ref = descriptor1;
SocketDescriptor descriptor2 = SocketDescriptor.new_impl(new SocketDescriptor.SocketDescriptorInterface() {
@Override
public long send_data(byte[] data, boolean resume_read) {
- do_read_event(list, peer1.peer_manager, descriptor1ref.val, data);
+ do_read_event(peer1.peer_manager, descriptor1ref.val, data);
return data.length;
}
descriptor1.val = SocketDescriptor.new_impl(new SocketDescriptor.SocketDescriptorInterface() {
@Override
public long send_data(byte[] data, boolean resume_read) {
- do_read_event(list, peer2.peer_manager, descriptor2, data);
+ do_read_event(peer2.peer_manager, descriptor2, data);
return data.length;
}
Result_NonePeerHandleErrorZ inbound_conn_res = peer2.peer_manager.new_inbound_connection(descriptor2);
assert inbound_conn_res instanceof Result_NonePeerHandleErrorZ.Result_NonePeerHandleErrorZ_OK;
- do_read_event(list, peer2.peer_manager, descriptor2, ((Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) conn_res).res);
+ do_read_event(peer2.peer_manager, descriptor2, ((Result_CVec_u8ZPeerHandleErrorZ.Result_CVec_u8ZPeerHandleErrorZ_OK) conn_res).res);
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
Result_NoneAPIErrorZ cc_res = peer1.chan_manager.create_channel(peer2.node_id, 10000, 1000, 42, null);
assert cc_res instanceof Result_NoneAPIErrorZ.Result_NoneAPIErrorZ_OK;
peer1.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
Event[] events = peer1.chan_manager_events.get_and_clear_pending_events();
assert events.length == 1;
peer1.chan_manager.funding_transaction_generated(chan_id, OutPoint.constructor_new(funding.getTxId().getReversedBytes(), (short) 0));
peer1.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
events = peer1.chan_manager_events.get_and_clear_pending_events();
assert events.length == 1;
peer1.peer_manager.process_events();
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer1.chan_manager.list_channels();
ChannelDetails[] peer1_chans = peer1.chan_manager.list_channels();
assert payment_res instanceof Result_NonePaymentSendFailureZ.Result_NonePaymentSendFailureZ_OK;
peer1.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer1.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
events = peer2.chan_manager_events.get_and_clear_pending_events();
assert events.length == 1;
peer2.chan_manager.claim_funds(payment_preimage, new byte[32], ((Event.PaymentReceived) events[0]).amt);
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer1.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
events = peer1.chan_manager_events.get_and_clear_pending_events();
assert events.length == 1;
assert close_res instanceof Result_NoneAPIErrorZ.Result_NoneAPIErrorZ_OK;
peer1.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer1.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
assert peer1.broadcast_set.size() == 1;
assert peer2.broadcast_set.size() == 1;
peer1.chan_manager.force_close_all_channels();
peer1.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer1.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
peer2.peer_manager.process_events();
- while (!list.isEmpty()) { list.poll().join(); }
+ wait_events_processed();
assert peer1.broadcast_set.size() == 1;
assert peer2.broadcast_set.size() == 0;