package io.zeebe.broker.subscription.message;

import io.zeebe.broker.subscription.message.processor.MessageObserver;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.broker.test.MsgPackConstants;
import io.zeebe.exporter.record.Record;
import io.zeebe.msgpack.spec.MsgPackHelper;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandResponse;
import io.zeebe.test.broker.protocol.clientapi.PartitionTestClient;
import io.zeebe.test.util.MsgPackUtil;
import java.util.Map;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/subscription/message/PublishMessageTest.class */
public class PublishMessageTest {
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private PartitionTestClient testClient;

    public PublishMessageTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void setup() {
        this.testClient = this.apiRule.partitionClient();
    }

    @Test
    public void shouldPublishMessage() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").put("timeToLive", 1000).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getRecordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(sendAndAwait.getIntent()).isEqualTo(MessageIntent.PUBLISHED);
        Assertions.assertThat(sendAndAwait.getValue()).containsExactly(new Map.Entry[]{Assertions.entry("name", "order canceled"), Assertions.entry("correlationKey", "order-123"), Assertions.entry("timeToLive", 1000L), Assertions.entry("payload", MsgPackHelper.EMTPY_OBJECT), Assertions.entry("messageId", "")});
        Record receiveFirstMessageEvent = this.testClient.receiveFirstMessageEvent(MessageIntent.PUBLISHED);
        Assertions.assertThat(receiveFirstMessageEvent.getKey()).isEqualTo(sendAndAwait.getKey());
        Assertions.assertThat(MsgPackUtil.asMsgPackReturnArray(receiveFirstMessageEvent.getValue().getPayload())).isEqualTo(MsgPackHelper.EMTPY_OBJECT);
        io.zeebe.exporter.record.Assertions.assertThat(receiveFirstMessageEvent.getValue()).hasName("order canceled").hasCorrelationKey("order-123").hasTimeToLive(1000L).hasMessageId("");
    }

    @Test
    public void shouldPublishMessageWithPayload() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").put("timeToLive", 1000).put("payload", MsgPackConstants.MSGPACK_PAYLOAD).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getIntent()).isEqualTo(MessageIntent.PUBLISHED);
        Assertions.assertThat(sendAndAwait.getValue()).contains(new Map.Entry[]{Assertions.entry("payload", MsgPackConstants.MSGPACK_PAYLOAD)});
    }

    @Test
    public void shouldPublishMessageWithMessageId() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").put("timeToLive", 1000).put("messageId", "msg-1").done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getIntent()).isEqualTo(MessageIntent.PUBLISHED);
        Assertions.assertThat(sendAndAwait.getValue()).contains(new Map.Entry[]{Assertions.entry("messageId", "msg-1")});
    }

    @Test
    public void shouldPublishMessageWithZeroTTL() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").put("timeToLive", 0).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getIntent()).isEqualTo(MessageIntent.PUBLISHED);
        Assertions.assertThat(sendAndAwait.getValue()).contains(new Map.Entry[]{Assertions.entry("timeToLive", 0L)});
    }

    @Test
    public void shouldPublishMessageWithNegativeTTL() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").put("timeToLive", -1L).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getIntent()).isEqualTo(MessageIntent.PUBLISHED);
        Assertions.assertThat(sendAndAwait.getValue()).contains(new Map.Entry[]{Assertions.entry("timeToLive", -1L)});
    }

    @Test
    public void shouldPublishSecondMessageWithDifferenId() {
        publishMessage("order canceled", "order-123", "msg-1");
        Assertions.assertThat(publishMessage("order canceled", "order-123", "msg-2").getIntent()).isEqualTo(MessageIntent.PUBLISHED);
    }

    @Test
    public void shouldPublishSecondMessageWithDifferentName() {
        publishMessage("order canceled", "order-123", "msg-1");
        Assertions.assertThat(publishMessage("order shipped", "order-123", "msg-1").getIntent()).isEqualTo(MessageIntent.PUBLISHED);
    }

    @Test
    public void shouldPublishSecondMessageWithDiffentCorrelationKey() {
        publishMessage("order canceled", "order-123", "msg-1");
        Assertions.assertThat(publishMessage("order canceled", "order-456", "msg-1").getIntent()).isEqualTo(MessageIntent.PUBLISHED);
    }

    @Test
    public void shouldPublishSameMessageWithEmptyId() {
        publishMessage("order canceled", "order-123", "");
        Assertions.assertThat(publishMessage("order canceled", "order-123", "").getIntent()).isEqualTo(MessageIntent.PUBLISHED);
    }

    @Test
    public void shouldPublishSameMessageWithoutId() {
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").put("timeToLive", 1000).done()).sendAndAwait();
        Assertions.assertThat(((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").put("timeToLive", 1000).done()).sendAndAwait().getIntent()).isEqualTo(MessageIntent.PUBLISHED);
    }

    @Test
    public void shouldRejectToPublishSameMessageWithId() {
        publishMessage("order canceled", "order-123", "msg-1");
        ExecuteCommandResponse publishMessage = publishMessage("order canceled", "order-123", "msg-1");
        Assertions.assertThat(publishMessage.getRecordType()).isEqualTo(RecordType.COMMAND_REJECTION);
        Assertions.assertThat(publishMessage.getRejectionType()).isEqualTo(RejectionType.ALREADY_EXISTS);
        Assertions.assertThat(((Record) this.testClient.receiveMessages().onlyCommandRejections().withIntent(MessageIntent.PUBLISH).getFirst()).getMetadata().getRejectionType()).isEqualTo(RejectionType.ALREADY_EXISTS);
    }

    @Test
    public void shouldDeleteMessageAfterTTL() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").put("timeToLive", 100L).done()).sendAndAwait();
        PartitionTestClient partitionClient = this.apiRule.partitionClient();
        this.brokerRule.getClock().addTime(MessageObserver.MESSAGE_TIME_TO_LIVE_CHECK_INTERVAL.plusMillis(100L));
        Record receiveFirstMessageEvent = partitionClient.receiveFirstMessageEvent(MessageIntent.DELETED);
        Assertions.assertThat(receiveFirstMessageEvent.getKey()).isEqualTo(sendAndAwait.getKey());
        Assertions.assertThat(MsgPackUtil.asMsgPackReturnArray(receiveFirstMessageEvent.getValue().getPayload())).isEqualTo(MsgPackHelper.EMTPY_OBJECT);
        io.zeebe.exporter.record.Assertions.assertThat(receiveFirstMessageEvent.getValue()).hasName("order canceled").hasCorrelationKey("order-123").hasTimeToLive(100L).hasMessageId("");
    }

    @Test
    public void shouldDeleteMessageImmediatelyWithZeroTTL() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").put("timeToLive", 0L).done()).sendAndAwait();
        this.brokerRule.getClock().addTime(MessageObserver.MESSAGE_TIME_TO_LIVE_CHECK_INTERVAL);
        Record receiveFirstMessageEvent = this.testClient.receiveFirstMessageEvent(MessageIntent.DELETED);
        Assertions.assertThat(receiveFirstMessageEvent.getKey()).isEqualTo(sendAndAwait.getKey());
        Assertions.assertThat(MsgPackUtil.asMsgPackReturnArray(receiveFirstMessageEvent.getValue().getPayload())).isEqualTo(MsgPackHelper.EMTPY_OBJECT);
        io.zeebe.exporter.record.Assertions.assertThat(receiveFirstMessageEvent.getValue()).hasName("order canceled").hasCorrelationKey("order-123").hasTimeToLive(0L).hasMessageId("");
    }

    @Test
    public void shouldFailToPublishMessageWithoutName() {
        ExecuteCommandRequestBuilder executeCommandRequestBuilder = (ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("correlationKey", "order-123").put("timeToLive", 1000).done();
        Assertions.assertThatThrownBy(() -> {
            executeCommandRequestBuilder.sendAndAwait();
        }).hasMessageContaining("Property 'name' has no valid value");
    }

    @Test
    public void shouldFailToPublishMessageWithoutCorrelationKey() {
        ExecuteCommandRequestBuilder executeCommandRequestBuilder = (ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("timeToLive", 1000).done();
        Assertions.assertThatThrownBy(() -> {
            executeCommandRequestBuilder.sendAndAwait();
        }).hasMessageContaining("Property 'correlationKey' has no valid value");
    }

    @Test
    public void shouldFailToPublishMessageWithoutTimeToLive() {
        ExecuteCommandRequestBuilder executeCommandRequestBuilder = (ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", "order canceled").put("correlationKey", "order-123").done();
        Assertions.assertThatThrownBy(() -> {
            executeCommandRequestBuilder.sendAndAwait();
        }).hasMessageContaining("Property 'timeToLive' has no valid value");
    }

    private ExecuteCommandResponse publishMessage(String str, String str2, String str3) {
        return ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.MESSAGE, MessageIntent.PUBLISH).command().put("name", str).put("correlationKey", str2).put("timeToLive", 1000).put("messageId", str3).done()).sendAndAwait();
    }
}
