package io.vertx.amqp;

import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;

/* loaded from: input_file:io/vertx/amqp/ReceiverCreditTest.class */
public class ReceiverCreditTest extends BareTestBase {
    private MockServer server;

    @Override // io.vertx.amqp.BareTestBase
    public void tearDown() throws InterruptedException {
        super.tearDown();
        if (this.server != null) {
            this.server.close();
        }
    }

    @Test(timeout = 20000)
    public void testInitialCredit(TestContext testContext) throws Exception {
        doConsumerInitialCreditTestImpl(testContext, false, 1000);
    }

    @Test(timeout = 20000)
    public void testInitialCreditInfluencedByConsumerBufferSize(TestContext testContext) throws Exception {
        doConsumerInitialCreditTestImpl(testContext, true, 42);
    }

    private void doConsumerInitialCreditTestImpl(TestContext testContext, boolean z, int i) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Async async = testContext.async();
        Async async2 = testContext.async();
        this.server = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertNotNull(remoteSource, "source should not be null");
                testContext.assertEquals(methodName, remoteSource.getAddress(), "expected given address");
                protonSender.setSource(remoteSource.copy());
                protonSender.sendQueueDrainHandler(protonSender -> {
                    if (atomicBoolean.compareAndSet(false, true)) {
                        testContext.assertEquals(Integer.valueOf(i), Integer.valueOf(protonSender.getCredit()), "unexpected initial credit");
                        testContext.assertFalse(protonSender.sendQueueFull(), "expected send queue not to be full");
                        async.complete();
                        Message message = Proton.message();
                        message.setBody(new AmqpValue(str));
                        protonSender.send(message);
                    }
                });
                protonSender.open();
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort()));
        this.client.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            AmqpReceiverOptions amqpReceiverOptions = new AmqpReceiverOptions();
            if (z) {
                amqpReceiverOptions.setMaxBufferedMessages(i);
            }
            ((AmqpConnection) asyncResult.result()).createReceiver(methodName, amqpReceiverOptions, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    testContext.assertNotNull(amqpMessage.bodyAsString(), "amqp message body content was null");
                    testContext.assertEquals(str, amqpMessage.bodyAsString(), "amqp message body not as expected");
                    async2.complete();
                });
            });
        });
        async.awaitSuccess();
        async2.awaitSuccess();
    }

    @Test(timeout = 20000)
    public void testDynamicReceiver(TestContext testContext) throws ExecutionException, InterruptedException {
        String uuid = UUID.randomUUID().toString();
        Async async = testContext.async();
        this.server = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.closeHandler(asyncResult2 -> {
                protonConnection.close();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                protonSender.closeHandler(asyncResult3 -> {
                    protonSender.close();
                });
                testContext.assertNotNull(protonSender.getRemoteSource(), "source should not be null");
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertTrue(remoteSource.getDynamic(), "expected dynamic source to be requested");
                testContext.assertNull(remoteSource.getAddress(), "expected no source address to be set");
                Source copy = remoteSource.copy();
                copy.setAddress(uuid);
                protonSender.setSource(copy);
                protonSender.open();
                async.complete();
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort()));
        this.client.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createDynamicReceiver(asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                testContext.assertEquals(((AmqpReceiver) asyncResult.result()).address(), uuid);
            });
        });
        async.awaitSuccess();
    }
}
