package io.vertx.amqp;

import io.vertx.core.Handler;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.proton.ProtonHelper;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.Target;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;

/* loaded from: input_file:io/vertx/amqp/SenderTest.class */
public class SenderTest extends BareTestBase {
    private static final String ACCEPT = "accept";
    private static final String RELEASE = "release";
    private static final String MODIFY_FAILED = "modify-failed";
    private static final String MODIFY_FAILED_U_H = "modify-failed-u-h";
    private static final String REJECT = "reject";
    private static final String DEFAULT = "default";
    private static final String DURABLE = "durable";
    private static final String NON_DURABLE = "non-durable";
    private MockServer server;

    @Override // io.vertx.amqp.BareTestBase
    @After
    public void tearDown() throws InterruptedException {
        super.tearDown();
        if (this.server != null) {
            this.server.close();
        }
    }

    @Test(timeout = 20000)
    public void testProducerClose(TestContext testContext) throws Exception {
        doProducerCloseTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testProducerEnd(TestContext testContext) throws Exception {
        doProducerCloseTestImpl(testContext, true);
    }

    private void doProducerCloseTestImpl(TestContext testContext, boolean z) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertNotNull(remoteTarget, "target should not be null");
                testContext.assertEquals(methodName, remoteTarget.getAddress(), "expected given address");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body, "received body was null");
                    testContext.assertTrue(body instanceof AmqpValue, "unexpected body section type: " + body.getClass());
                    testContext.assertEquals(str, body.getValue(), "Unexpected message body content");
                    async.complete();
                });
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                    async2.complete();
                });
                protonReceiver.open();
            });
        });
        AmqpClient create = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        create.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender(methodName, asyncResult -> {
                AmqpSender amqpSender = (AmqpSender) asyncResult.result();
                amqpSender.exceptionHandler(th -> {
                    atomicBoolean.set(true);
                });
                amqpSender.sendWithAck(AmqpMessage.create().withBody(str).build(), asyncResult -> {
                    testContext.assertTrue(asyncResult.succeeded());
                    if (z) {
                        amqpSender.end();
                    } else {
                        amqpSender.close((Handler) null);
                    }
                    create.close(asyncResult -> {
                        testContext.assertTrue(asyncResult.succeeded());
                        async3.complete();
                    });
                });
            });
        });
        try {
            async.awaitSuccess();
            async2.awaitSuccess();
            async3.awaitSuccess();
            testContext.assertFalse(atomicBoolean.get(), "exception handler unexpectedly called");
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testSenderFlowControlMechanisms(TestContext testContext) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        this.server = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertNotNull(remoteTarget, "target should not be null");
                testContext.assertEquals(methodName, remoteTarget.getAddress(), "expected given address");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.setAutoAccept(false);
                protonReceiver.setPrefetch(0);
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body, "received body was null");
                    testContext.assertTrue(body instanceof AmqpValue, "unexpected body section type: " + body.getClass());
                    testContext.assertEquals(str, body.getValue(), "Unexpected message body content");
                    this.vertx.setTimer(250L, l -> {
                        async2.awaitSuccess();
                        protonReceiver.flow(1);
                    });
                });
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                });
                protonReceiver.open();
                this.vertx.setTimer(250L, l -> {
                    async.awaitSuccess();
                    protonReceiver.flow(1);
                });
            });
        });
        AmqpClient create = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort()));
        create.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender(methodName, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                AmqpSender amqpSender = (AmqpSender) asyncResult.result();
                testContext.assertTrue(amqpSender.writeQueueFull(), "expected write queue to be full, we have not yet granted credit");
                amqpSender.drainHandler(r14 -> {
                    testContext.assertTrue(async.isSucceeded(), "should have been called after initial credit delay");
                    testContext.assertFalse(amqpSender.writeQueueFull(), "expected write queue not to be full, we just granted credit");
                    amqpSender.send(AmqpMessage.create().withBody(str).build());
                    testContext.assertTrue(amqpSender.writeQueueFull(), "expected write queue to be full, we just used all the credit");
                    amqpSender.drainHandler(r9 -> {
                        testContext.assertTrue(async2.isSucceeded(), "should have been called after 2nd credit delay");
                        testContext.assertFalse(amqpSender.writeQueueFull(), "expected write queue not to be full, we just granted credit");
                        create.close(asyncResult -> {
                            testContext.assertTrue(asyncResult.succeeded());
                            async3.complete();
                        });
                    });
                    async2.complete();
                });
            });
            async.complete();
        });
        async3.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testSenderClosedRemotelyCallsExceptionHandler(TestContext testContext) throws Exception {
        doSenderClosedRemotelyCallsExceptionHandlerTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testSenderClosedRemotelyWithErrorCallsExceptionHandler(TestContext testContext) throws Exception {
        doSenderClosedRemotelyCallsExceptionHandlerTestImpl(testContext, true);
    }

    private void doSenderClosedRemotelyCallsExceptionHandlerTestImpl(TestContext testContext, boolean z) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        this.server = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertNotNull(remoteTarget, "target should not be null");
                testContext.assertEquals(methodName, remoteTarget.getAddress(), "expected given address");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body, "received body was null");
                    testContext.assertTrue(body instanceof AmqpValue, "unexpected body section type: " + body.getClass());
                    testContext.assertEquals(str, body.getValue(), "Unexpected message body content");
                    if (z) {
                        protonReceiver.setCondition(ProtonHelper.condition(AmqpError.INTERNAL_ERROR, "testing-error"));
                    }
                    protonReceiver.close();
                });
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                });
                protonReceiver.open();
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort()));
        this.client.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender(methodName, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                AmqpSender amqpSender = (AmqpSender) asyncResult.result();
                amqpSender.exceptionHandler(th -> {
                    testContext.assertNotNull(th, "expected exception");
                    testContext.assertTrue(th instanceof Exception, "expected vertx exception");
                    if (z) {
                        testContext.assertNotNull(th.getCause(), "expected cause");
                    } else {
                        testContext.assertNull(th.getCause(), "expected no cause");
                    }
                    async2.complete();
                    this.client.close(asyncResult -> {
                        if (asyncResult.failed()) {
                            asyncResult.cause().printStackTrace();
                        }
                        testContext.assertTrue(asyncResult.succeeded());
                        async.complete();
                    });
                });
                amqpSender.send(AmqpMessage.create().withBody(str).build());
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testSenderWithTargetCapability(TestContext testContext) throws ExecutionException, InterruptedException {
        String str = "queue";
        Async async = testContext.async();
        Async async2 = testContext.async();
        this.server = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.closeHandler(asyncResult2 -> {
                protonConnection.close();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                protonReceiver.closeHandler(asyncResult3 -> {
                    protonReceiver.close();
                });
                testContext.assertNotNull(protonReceiver.getRemoteTarget(), "target should not be null");
                org.apache.qpid.proton.amqp.messaging.Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertFalse(remoteTarget.getDynamic(), "dynamic target should not be requested");
                testContext.assertEquals(this.name.getMethodName(), remoteTarget.getAddress(), "expected target address to be set");
                Symbol[] symbolArr = {Symbol.valueOf(str)};
                Symbol[] capabilities = remoteTarget.getCapabilities();
                testContext.assertTrue(Arrays.equals(symbolArr, capabilities), "Unexpected capabilities: " + Arrays.toString(capabilities));
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.open();
                async.complete();
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort()));
        this.client.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender(this.name.getMethodName(), new AmqpSenderOptions().addCapability(str), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                async2.complete();
            });
        });
        async.awaitSuccess();
        async2.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testDynamicSenderWithOptions(TestContext testContext) throws ExecutionException, InterruptedException {
        String uuid = UUID.randomUUID().toString();
        Async async = testContext.async();
        Async async2 = testContext.async();
        this.server = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.closeHandler(asyncResult2 -> {
                protonConnection.close();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                protonReceiver.closeHandler(asyncResult3 -> {
                    protonReceiver.close();
                });
                testContext.assertNotNull(protonReceiver.getRemoteTarget(), "target should not be null");
                org.apache.qpid.proton.amqp.messaging.Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertTrue(remoteTarget.getDynamic(), "expected dynamic target to be requested");
                testContext.assertNull(remoteTarget.getAddress(), "expected no target address to be set");
                org.apache.qpid.proton.amqp.messaging.Target copy = remoteTarget.copy();
                copy.setAddress(uuid);
                protonReceiver.setTarget(copy);
                protonReceiver.open();
                async.complete();
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort()));
        this.client.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender((String) null, new AmqpSenderOptions().setDynamic(true), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                testContext.assertNotNull(((AmqpSender) asyncResult.result()).address());
                async2.complete();
            });
        });
        async.awaitSuccess();
        async2.awaitSuccess();
    }

    @Test(timeout = 10000)
    public void testAcknowledgementHandling(TestContext testContext) throws Exception {
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        this.server = setupMockServerForAckHandling(testContext, copyOnWriteArrayList);
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender(uuid, asyncResult -> {
                if (asyncResult.failed()) {
                    asyncResult.cause().printStackTrace();
                }
                testContext.assertTrue(asyncResult.succeeded());
                AmqpSender amqpSender = (AmqpSender) asyncResult.result();
                amqpSender.sendWithAck(AmqpMessage.create().withBody(ACCEPT).build(), asyncResult -> {
                    testContext.assertTrue(asyncResult.succeeded());
                    countDownLatch.countDown();
                });
                amqpSender.sendWithAck(AmqpMessage.create().withBody(RELEASE).build(), asyncResult2 -> {
                    testContext.assertFalse(asyncResult2.succeeded());
                    testContext.assertTrue(asyncResult2.cause().getMessage().contains("RELEASED"));
                    countDownLatch.countDown();
                });
                amqpSender.sendWithAck(AmqpMessage.create().withBody(MODIFY_FAILED).build(), asyncResult3 -> {
                    testContext.assertFalse(asyncResult3.succeeded());
                    testContext.assertTrue(asyncResult3.cause().getMessage().contains("MODIFIED"));
                    countDownLatch.countDown();
                });
                amqpSender.sendWithAck(AmqpMessage.create().withBody(MODIFY_FAILED_U_H).build(), asyncResult4 -> {
                    testContext.assertFalse(asyncResult4.succeeded());
                    testContext.assertTrue(asyncResult4.cause().getMessage().contains("MODIFIED"));
                    countDownLatch.countDown();
                });
                amqpSender.sendWithAck(AmqpMessage.create().withBody(REJECT).build(), asyncResult5 -> {
                    testContext.assertFalse(asyncResult5.succeeded());
                    testContext.assertTrue(asyncResult5.cause().getMessage().contains("REJECTED"));
                    countDownLatch.countDown();
                });
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new Object[]{ACCEPT, RELEASE, MODIFY_FAILED, MODIFY_FAILED_U_H, REJECT});
    }

    private MockServer setupMockServerForAckHandling(TestContext testContext, List<Object> list) throws Exception {
        return new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.closeHandler(asyncResult2 -> {
                    protonSession.close();
                });
                protonSession.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                protonReceiver.setAutoAccept(false);
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertTrue(body instanceof AmqpValue);
                    testContext.assertNotNull(body.getValue());
                    testContext.assertTrue(body.getValue() instanceof String);
                    String str = (String) body.getValue();
                    list.add(str);
                    boolean z = -1;
                    switch (str.hashCode()) {
                        case -1423461112:
                            if (str.equals(ACCEPT)) {
                                z = false;
                                break;
                            }
                            break;
                        case -934710369:
                            if (str.equals(REJECT)) {
                                z = 4;
                                break;
                            }
                            break;
                        case -583173517:
                            if (str.equals(MODIFY_FAILED_U_H)) {
                                z = 3;
                                break;
                            }
                            break;
                        case 1090594823:
                            if (str.equals(RELEASE)) {
                                z = true;
                                break;
                            }
                            break;
                        case 1812406448:
                            if (str.equals(MODIFY_FAILED)) {
                                z = 2;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            protonDelivery.disposition(Accepted.getInstance(), true);
                            return;
                        case true:
                            protonDelivery.disposition(Released.getInstance(), true);
                            return;
                        case true:
                            Modified modified = new Modified();
                            modified.setDeliveryFailed(true);
                            protonDelivery.disposition(modified, true);
                            return;
                        case true:
                            Modified modified2 = new Modified();
                            modified2.setDeliveryFailed(true);
                            modified2.setUndeliverableHere(true);
                            protonDelivery.disposition(modified2, true);
                            return;
                        case true:
                            protonDelivery.disposition(new Rejected(), true);
                            return;
                        default:
                            testContext.fail("Unexpected message payload recieved");
                            return;
                    }
                });
                protonReceiver.open();
            });
        });
    }

    @Test(timeout = 10000)
    public void testCreatingSenderWithoutCreatingConnectionFirst(TestContext testContext) throws Exception {
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        String str = "testCreatingSenderWithoutCreatingConnectionFirst";
        this.server = setupMockServerForCreatingSenderWithoutConnectionFirst(testContext, copyOnWriteArrayList, null);
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).createSender(uuid, asyncResult -> {
            if (asyncResult.failed()) {
                asyncResult.cause().printStackTrace();
            }
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpSender) asyncResult.result()).sendWithAck(AmqpMessage.create().withBody(str).build(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                countDownLatch.countDown();
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new Object[]{"testCreatingSenderWithoutCreatingConnectionFirst"});
    }

    @Test(timeout = 10000)
    public void testCreatingSenderWithOptionsWithoutCreatingConnectionFirst(TestContext testContext) throws Exception {
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        String str = "testCreatingSenderWithOptionsWithoutCreatingConnectionFirst";
        this.server = setupMockServerForCreatingSenderWithoutConnectionFirst(testContext, copyOnWriteArrayList, "notUsuallyExplicitlySetForSendersButEasilyVerified");
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).createSender(uuid, new AmqpSenderOptions().setLinkName("notUsuallyExplicitlySetForSendersButEasilyVerified"), asyncResult -> {
            if (asyncResult.failed()) {
                asyncResult.cause().printStackTrace();
            }
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpSender) asyncResult.result()).sendWithAck(AmqpMessage.create().withBody(str).build(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                countDownLatch.countDown();
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new Object[]{"testCreatingSenderWithOptionsWithoutCreatingConnectionFirst"});
    }

    private MockServer setupMockServerForCreatingSenderWithoutConnectionFirst(TestContext testContext, List<Object> list, String str) throws Exception {
        return new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.closeHandler(asyncResult2 -> {
                    protonSession.close();
                });
                protonSession.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                if (str != null) {
                    testContext.assertEquals(str, protonReceiver.getName());
                }
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertTrue(body instanceof AmqpValue);
                    Object value = body.getValue();
                    testContext.assertNotNull(value);
                    list.add(value);
                });
                protonReceiver.open();
            });
        });
    }

    @Test(timeout = 10000)
    public void testMessageDurability(TestContext testContext) throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.server = setupMockServerForDurabilityHandling(testContext, countDownLatch, copyOnWriteArrayList);
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender(UUID.randomUUID().toString(), asyncResult -> {
                if (asyncResult.failed()) {
                    asyncResult.cause().printStackTrace();
                }
                testContext.assertTrue(asyncResult.succeeded());
                AmqpSender amqpSender = (AmqpSender) asyncResult.result();
                amqpSender.send(AmqpMessage.create().withBody(DEFAULT).build());
                amqpSender.send(AmqpMessage.create().withBody(DURABLE).durable(true).build());
                amqpSender.send(AmqpMessage.create().withBody(NON_DURABLE).durable(false).build());
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new Object[]{DEFAULT, DURABLE, NON_DURABLE});
    }

    private MockServer setupMockServerForDurabilityHandling(TestContext testContext, CountDownLatch countDownLatch, List<Object> list) throws Exception {
        return new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.closeHandler(asyncResult2 -> {
                    protonSession.close();
                });
                protonSession.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertTrue(body instanceof AmqpValue);
                    testContext.assertNotNull(body.getValue());
                    testContext.assertTrue(body.getValue() instanceof String);
                    String str = (String) body.getValue();
                    boolean z = -1;
                    switch (str.hashCode()) {
                        case 1445176283:
                            if (str.equals(NON_DURABLE)) {
                                z = 2;
                                break;
                            }
                            break;
                        case 1544803905:
                            if (str.equals(DEFAULT)) {
                                z = false;
                                break;
                            }
                            break;
                        case 2013934299:
                            if (str.equals(DURABLE)) {
                                z = true;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            testContext.assertNull(message.getProperties());
                            testContext.assertFalse(message.isDurable());
                            break;
                        case true:
                            testContext.assertTrue(message.isDurable());
                            break;
                        case true:
                            testContext.assertFalse(message.isDurable());
                            break;
                        default:
                            testContext.fail("Unexpected message payload recieved");
                            break;
                    }
                    list.add(str);
                    countDownLatch.countDown();
                });
                protonReceiver.open();
            });
        });
    }
}
