package io.vertx.ext.stomp.impl;

import com.jayway.awaitility.Awaitility;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerHandler;
import io.vertx.ext.stomp.utils.Headers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/ext/stomp/impl/ReceiptTest.class */
public class ReceiptTest {
    private Vertx vertx;
    private StompServer server;
    private List<StompClient> clients = new ArrayList();

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx)).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
    }

    @After
    public void tearDown() {
        this.clients.forEach((v0) -> {
            v0.close();
        });
        this.clients.clear();
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.vertx.close(asyncLock2.handler());
        asyncLock2.waitForSuccess();
    }

    @Test
    public void testReceiptsOnSend() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            Handler handler = (v1) -> {
                r2.add(v1);
            };
            copyOnWriteArrayList2.getClass();
            stompClientConnection.subscribe("/queue", handler, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        Assertions.assertThat(copyOnWriteArrayList2).hasSize(1);
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(0)).toString()).contains(new CharSequence[]{"SUBSCRIBE", "/queue"});
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            Buffer buffer = Buffer.buffer("Hello");
            copyOnWriteArrayList2.getClass();
            stompClientConnection.send("/queue", buffer, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!copyOnWriteArrayList.isEmpty());
        });
        Assertions.assertThat(copyOnWriteArrayList2).hasSize(2);
    }

    @Test
    public void testReceiptsOnSubscribeAndUnsubscribe() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        AtomicReference atomicReference = new AtomicReference();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            Handler handler = (v1) -> {
                r2.add(v1);
            };
            copyOnWriteArrayList2.getClass();
            stompClientConnection.subscribe("/queue", handler, (v1) -> {
                r3.add(v1);
            });
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            atomicReference.set(stompClientConnection);
            copyOnWriteArrayList.getClass();
            Handler handler = (v1) -> {
                r2.add(v1);
            };
            copyOnWriteArrayList2.getClass();
            stompClientConnection.subscribe("/queue", handler, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult3.result();
            Buffer buffer = Buffer.buffer("Hello");
            copyOnWriteArrayList2.getClass();
            stompClientConnection.send("/queue", buffer, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() >= 2);
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() == 3);
        });
        StompClientConnection stompClientConnection = (StompClientConnection) atomicReference.get();
        copyOnWriteArrayList2.getClass();
        stompClientConnection.unsubscribe("/queue", (v1) -> {
            r2.add(v1);
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() == 4);
        });
    }

    @Test
    public void testReceiptsWithAck() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Headers create = Headers.create(new String[]{"ack", "client"});
            Handler handler = frame -> {
                stompClientConnection.ack(frame.getAck());
            };
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", create, handler, (v1) -> {
                r4.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            Buffer buffer = Buffer.buffer("Hello");
            copyOnWriteArrayList.getClass();
            stompClientConnection.send("/queue", buffer, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
    }

    @Test
    public void testReceiptsWithNack() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Headers create = Headers.create(new String[]{"ack", "client"});
            Handler handler = frame -> {
                stompClientConnection.nack(frame.getAck());
            };
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", create, handler, (v1) -> {
                r4.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            Buffer buffer = Buffer.buffer("Hello");
            copyOnWriteArrayList.getClass();
            stompClientConnection.send("/queue", buffer, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
    }

    @Test
    public void testReceiptsInCommittedTransactions() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList3 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList2.getClass();
            Handler handler = (v1) -> {
                r2.add(v1);
            };
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", handler, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 1);
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList3.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            copyOnWriteArrayList.getClass();
            stompClientConnection.beginTX("my-tx", (v1) -> {
                r2.add(v1);
            });
            Frame body = new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello"));
            copyOnWriteArrayList.getClass();
            stompClientConnection.send(body, (v1) -> {
                r2.add(v1);
            });
            Frame body2 = new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World"));
            copyOnWriteArrayList.getClass();
            stompClientConnection.send(body2, (v1) -> {
                r2.add(v1);
            });
            Frame body3 = new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!"));
            copyOnWriteArrayList.getClass();
            stompClientConnection.send(body3, (v1) -> {
                r2.add(v1);
            });
            copyOnWriteArrayList.getClass();
            stompClientConnection.commit("my-tx", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() == 3 && copyOnWriteArrayList3.isEmpty() && copyOnWriteArrayList.size() == 6);
        });
    }

    @Test
    public void testReceiptsInAbortedTransactions() throws InterruptedException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList3 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            Handler handler = (v1) -> {
                r2.add(v1);
            };
            copyOnWriteArrayList3.getClass();
            stompClientConnection.subscribe("/queue", handler, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList3.size() == 1);
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            copyOnWriteArrayList3.getClass();
            stompClientConnection.beginTX("my-tx", (v1) -> {
                r2.add(v1);
            });
            Frame body = new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello"));
            copyOnWriteArrayList3.getClass();
            stompClientConnection.send(body, (v1) -> {
                r2.add(v1);
            });
            Frame body2 = new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World"));
            copyOnWriteArrayList3.getClass();
            stompClientConnection.send(body2, (v1) -> {
                r2.add(v1);
            });
            Frame body3 = new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!"));
            copyOnWriteArrayList3.getClass();
            stompClientConnection.send(body3, (v1) -> {
                r2.add(v1);
            });
            copyOnWriteArrayList3.getClass();
            stompClientConnection.abort("my-tx", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 0 && copyOnWriteArrayList2.isEmpty() && copyOnWriteArrayList3.size() == 6);
        });
    }
}
