package io.vertx.ext.stomp.impl;

import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.core.ConditionFactory;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.ServerFrame;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.StompClientOptions;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerHandler;
import io.vertx.ext.stomp.StompServerOptions;
import io.vertx.ext.stomp.utils.Headers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/ext/stomp/impl/StompClientImplTest.class */
public class StompClientImplTest {
    private Vertx vertx;
    private StompServer server;
    private StompServerOptions options;

    @Before
    public void setUp() {
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.options = new StompServerOptions();
        this.server = StompServer.create(this.vertx, this.options).handler(StompServerHandler.create(this.vertx)).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
    }

    @After
    public void tearDown() {
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.vertx.close(asyncLock2.handler());
        asyncLock2.waitForSuccess();
    }

    @Test
    public void testConnection() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx).connect(asyncResult -> {
            if (asyncResult.failed()) {
                atomicReference.set(null);
            } else {
                atomicReference.set(asyncResult.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).session());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).server());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).version());
    }

    @Test
    public void testConnectionWithTrailingLine() throws InterruptedException {
        this.options.setTrailingLine(true);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setTrailingLine(true)).connect(asyncResult -> {
            if (asyncResult.failed()) {
                atomicReference.set(null);
            } else {
                atomicReference.set(asyncResult.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).session());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).server());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).version());
    }

    @Test
    public void testConnectionWithStompFrame() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setUseStompFrame(true)).connect(asyncResult -> {
            if (asyncResult.failed()) {
                atomicReference.set(null);
            } else {
                atomicReference.set(asyncResult.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).session());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).server());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).version());
    }

    @Test
    public void testConnectionWithStompFrameWithTrailingLine() throws InterruptedException {
        this.options.setTrailingLine(true);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setUseStompFrame(true).setTrailingLine(true)).connect(asyncResult -> {
            if (asyncResult.failed()) {
                atomicReference.set(null);
            } else {
                atomicReference.set(asyncResult.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).session());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).server());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).version());
    }

    @Test
    public void testSendingMessages() {
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx).connect(asyncResult -> {
            if (asyncResult.failed()) {
                return;
            }
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Buffer buffer = Buffer.buffer("this is my content");
            atomicReference.getClass();
            stompClientConnection.send("/hello", buffer, (v1) -> {
                r3.set(v1);
            });
        });
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAtomic(atomicReference, Matchers.notNullValue(Frame.class));
        Assertions.assertThat(((Frame) atomicReference.get()).getDestination()).isEqualTo("/hello");
    }

    @Test
    public void testSendingMessagesWithTrailingLine() {
        this.options.setTrailingLine(true);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setTrailingLine(true)).connect(asyncResult -> {
            if (asyncResult.failed()) {
                return;
            }
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Buffer buffer = Buffer.buffer("this is my content");
            atomicReference.getClass();
            stompClientConnection.send("/hello", buffer, (v1) -> {
                r3.set(v1);
            });
        });
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAtomic(atomicReference, Matchers.notNullValue(Frame.class));
        Assertions.assertThat(((Frame) atomicReference.get()).getDestination()).isEqualTo("/hello");
    }

    @Test
    public void testConnectionAndDisconnect() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setUseStompFrame(true)).connect(asyncResult -> {
            if (!asyncResult.failed()) {
                ((StompClientConnection) asyncResult.result()).disconnect(frame -> {
                    atomicReference.set(frame);
                    countDownLatch.countDown();
                });
            } else {
                atomicReference.set(null);
                countDownLatch.countDown();
            }
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
    }

    @Test
    public void testConnectionAndDisconnectWithCustomFrame() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setUseStompFrame(true)).connect(asyncResult -> {
            if (!asyncResult.failed()) {
                ((StompClientConnection) asyncResult.result()).disconnect(new Frame(Frame.Command.DISCONNECT, Headers.create(new String[]{"message", "bye bye"}), (Buffer) null), frame -> {
                    atomicReference.set(frame);
                    countDownLatch.countDown();
                });
            } else {
                atomicReference.set(null);
                countDownLatch.countDown();
            }
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assertions.assertThat(((Frame) atomicReference.get()).getHeader("message")).contains(new CharSequence[]{"bye bye"});
    }

    @Test
    public void testClientHeartbeatWhenNoServerActivity() {
        AtomicReference atomicReference = new AtomicReference();
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).handler(StompServerHandler.create(this.vertx).pingHandler(stompServerConnection -> {
        })).listen(asyncLock2.handler());
        asyncLock2.waitForSuccess();
        StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).connect(asyncResult -> {
            atomicReference.set(asyncResult.result());
        });
        Awaitility.await().atMost(1000L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(((StompClientConnection) atomicReference.get()).session() == null);
        });
    }

    @Test
    public void testClientHeartbeatWithServerActivity() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).handler(StompServerHandler.create(this.vertx)).listen(asyncLock2.handler());
        asyncLock2.waitForSuccess();
        StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).connect(asyncResult -> {
            atomicReference.set(asyncResult.result());
        });
        Thread.sleep(1000L);
        Assertions.assertThat(((StompClientConnection) atomicReference.get()).server()).isNotNull();
    }

    @Test
    public void testServerHeartbeatWhenNoClientActivity() {
        AsyncLock asyncLock = new AsyncLock();
        AtomicReference atomicReference = new AtomicReference();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).handler(StompServerHandler.create(this.vertx)).listen(asyncLock2.handler());
        asyncLock2.waitForSuccess();
        StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).connect(asyncResult -> {
            atomicReference.set(asyncResult.result());
            ((StompClientConnection) asyncResult.result()).pingHandler(stompClientConnection -> {
            });
        });
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(((StompClientConnection) atomicReference.get()).session() == null);
        });
    }

    @Test
    public void testConnectionDroppedHandler() throws InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).handler(StompServerHandler.create(this.vertx).pingHandler(stompServerConnection -> {
            if (atomicBoolean.get()) {
                stompServerConnection.ping();
            }
        })).listen(asyncLock2.handler());
        asyncLock2.waitForSuccess();
        StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).connect(asyncResult -> {
            ((StompClientConnection) asyncResult.result()).connectionDroppedHandler(stompClientConnection -> {
                atomicBoolean2.set(true);
            });
            atomicBoolean.set(false);
        });
        ConditionFactory atMost = Awaitility.await().atMost(10L, TimeUnit.SECONDS);
        atomicBoolean2.getClass();
        atMost.until(atomicBoolean2::get);
    }

    @Test
    public void testReconnection() throws InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        ArrayList arrayList = new ArrayList();
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        StompServer create = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        StompServerHandler pingHandler = StompServerHandler.create(this.vertx).pingHandler(stompServerConnection -> {
            if (atomicBoolean.get()) {
                stompServerConnection.ping();
            }
        });
        arrayList.getClass();
        this.server = create.handler(pingHandler.receivedFrameHandler((v1) -> {
            r3.add(v1);
        })).listen(asyncLock2.handler());
        asyncLock2.waitForSuccess();
        StompClient create2 = StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        create2.connect(getConnectionHandler(create2, atomicBoolean, atomicInteger, atomicInteger2));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 1);
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger2.get() == 2);
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(containsClientFrame(arrayList, 1) && containsClientFrame(arrayList, 2));
        });
    }

    @Test
    public void testReconnectionWithDeadServer() throws InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        ArrayList arrayList = new ArrayList();
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        StompServer create = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        StompServerHandler pingHandler = StompServerHandler.create(this.vertx).pingHandler(stompServerConnection -> {
            if (atomicBoolean.get()) {
                stompServerConnection.ping();
            } else {
                this.server.close();
            }
        });
        arrayList.getClass();
        this.server = create.handler(pingHandler.receivedFrameHandler((v1) -> {
            r3.add(v1);
        })).listen(asyncLock2.handler());
        asyncLock2.waitForSuccess();
        StompClient create2 = StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        create2.connect(getConnectionHandler(create2, atomicBoolean, atomicInteger, atomicInteger2));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 1);
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger2.get() == 1);
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(containsClientFrame(arrayList, 1) && !containsClientFrame(arrayList, 2));
        });
    }

    private boolean containsClientFrame(List<ServerFrame> list, int i) {
        for (ServerFrame serverFrame : list) {
            if (serverFrame.frame().getBody() != null && serverFrame.frame().getBodyAsString().contains("some body " + i)) {
                return true;
            }
        }
        return false;
    }

    private Handler<AsyncResult<StompClientConnection>> getConnectionHandler(StompClient stompClient, AtomicBoolean atomicBoolean, AtomicInteger atomicInteger, AtomicInteger atomicInteger2) {
        return asyncResult -> {
            if (asyncResult.succeeded()) {
                ((StompClientConnection) asyncResult.result()).connectionDroppedHandler(stompClientConnection -> {
                    atomicInteger.incrementAndGet();
                    stompClient.connect(getConnectionHandler(stompClient, atomicBoolean, atomicInteger, atomicInteger2));
                });
                int incrementAndGet = atomicInteger2.incrementAndGet();
                atomicBoolean.set(false);
                ((StompClientConnection) asyncResult.result()).send("some-address", Buffer.buffer("some body " + incrementAndGet));
            }
        };
    }

    @Test
    public void testThatDroppedHandlerIsNotCalledWhenTheClientIsClosing() {
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000))).handler(StompServerHandler.create(this.vertx)).listen(asyncLock2.handler());
        asyncLock2.waitForSuccess();
        StompClient create = StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        create.connect(asyncResult -> {
            ((StompClientConnection) asyncResult.result()).connectionDroppedHandler(stompClientConnection -> {
                atomicBoolean.set(true);
            });
            atomicBoolean2.set(asyncResult.succeeded());
        });
        ConditionFactory atMost = Awaitility.await().atMost(10L, TimeUnit.SECONDS);
        atomicBoolean2.getClass();
        atMost.until(atomicBoolean2::get);
        create.close();
        AtomicBoolean atomicBoolean3 = new AtomicBoolean();
        this.vertx.setTimer(1000L, l -> {
            atomicBoolean3.set(true);
        });
        ConditionFactory atMost2 = Awaitility.await().atMost(10L, TimeUnit.SECONDS);
        atomicBoolean3.getClass();
        atMost2.until(atomicBoolean3::get);
        Assertions.assertThat(atomicBoolean.get()).isFalse();
    }
}
