package io.zeebe.test.broker.protocol.clientapi;

import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.test.util.collection.MapBuilder;
import io.zeebe.util.buffer.BufferUtil;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.camunda.bpm.model.bpmn.Bpmn;
import org.camunda.bpm.model.bpmn.BpmnModelInstance;

/* loaded from: input_file:io/zeebe/test/broker/protocol/clientapi/TestTopicClient.class */
public class TestTopicClient {
    public static final String PROP_STATE = "state";
    public static final String PROP_WORKFLOW_BPMN_PROCESS_ID = "bpmnProcessId";
    public static final String PROP_WORKFLOW_BPMN_XML = "bpmnXml";
    public static final String PROP_WORKFLOW_VERSION = "version";
    public static final String PROP_WORKFLOW_PAYLOAD = "payload";
    public static final String PROP_WORKFLOW_INSTANCE_KEY = "workflowInstanceKey";
    public static final String PROP_WORKFLOW_KEY = "workflowKey";
    private final ClientApiRule apiRule;
    private final String topicName;
    private final int partitionId;
    private boolean isTopicSubscriptionOpen = false;

    public TestTopicClient(ClientApiRule clientApiRule, String str, int i) {
        this.apiRule = clientApiRule;
        this.topicName = str;
        this.partitionId = i;
    }

    public long deploy(BpmnModelInstance bpmnModelInstance) {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().topicName(this.topicName).partitionId(this.partitionId).eventType(EventType.DEPLOYMENT_EVENT).command().put(PROP_STATE, "CREATE_DEPLOYMENT").put(PROP_WORKFLOW_BPMN_XML, Bpmn.convertToString(bpmnModelInstance).getBytes(StandardCharsets.UTF_8)).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getEvent().get(PROP_STATE)).isEqualTo("DEPLOYMENT_CREATED");
        return sendAndAwait.key();
    }

    public ExecuteCommandResponse createWorkflowInstanceWithResponse(String str) {
        return ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().topicName(this.topicName).partitionId(this.partitionId).eventTypeWorkflow().command().put(PROP_STATE, "CREATE_WORKFLOW_INSTANCE").put(PROP_WORKFLOW_BPMN_PROCESS_ID, str).done()).sendAndAwait();
    }

    public long createWorkflowInstance(String str) {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().topicName(this.topicName).partitionId(this.partitionId).eventTypeWorkflow().command().put(PROP_STATE, "CREATE_WORKFLOW_INSTANCE").put(PROP_WORKFLOW_BPMN_PROCESS_ID, str).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getEvent().get(PROP_STATE)).isEqualTo("WORKFLOW_INSTANCE_CREATED");
        Assertions.assertThat(sendAndAwait.position()).isGreaterThanOrEqualTo(0L);
        return sendAndAwait.key();
    }

    public long createWorkflowInstance(String str, DirectBuffer directBuffer) {
        return createWorkflowInstance(str, BufferUtil.bufferAsArray(directBuffer));
    }

    public long createWorkflowInstance(String str, byte[] bArr) {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().topicName(this.topicName).partitionId(this.partitionId).eventTypeWorkflow().command().put(PROP_STATE, "CREATE_WORKFLOW_INSTANCE").put(PROP_WORKFLOW_BPMN_PROCESS_ID, str).put(PROP_WORKFLOW_PAYLOAD, bArr).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getEvent().get(PROP_STATE)).isEqualTo("WORKFLOW_INSTANCE_CREATED");
        return sendAndAwait.key();
    }

    public void completeTaskOfType(String str) {
        completeTaskOfType(str, (byte[]) null);
    }

    public void completeTaskOfType(String str, DirectBuffer directBuffer) {
        completeTaskOfType(str, BufferUtil.bufferAsArray(directBuffer));
    }

    public void completeTaskOfType(String str, byte[] bArr) {
        completeTask(str, bArr, subscribedEvent -> {
            return true;
        });
    }

    public void completeTaskOfWorkflowInstance(String str, long j, byte[] bArr) {
        completeTask(str, bArr, subscribedEvent -> {
            return ((Map) subscribedEvent.event().get("headers")).get(PROP_WORKFLOW_INSTANCE_KEY).equals(Long.valueOf(j));
        });
    }

    public void completeTask(String str, byte[] bArr, Predicate<SubscribedEvent> predicate) {
        this.apiRule.openTaskSubscription(this.topicName, this.partitionId, str, 1000L).await();
        SubscribedEvent orElseThrow = this.apiRule.subscribedEvents().filter(taskEvents("LOCKED").and(taskType(str)).and(predicate)).findFirst().orElseThrow(() -> {
            return new AssertionError("Expected task locked event but not found.");
        });
        MapBuilder put = this.apiRule.createCmdRequest().topicName(this.topicName).partitionId(this.partitionId).key(orElseThrow.key()).eventTypeTask().command().put(PROP_STATE, "COMPLETE").put("type", str).put("lockOwner", orElseThrow.event().get("lockOwner")).put("headers", orElseThrow.event().get("headers"));
        if (bArr != null) {
            put.put(PROP_WORKFLOW_PAYLOAD, bArr);
        }
        Assertions.assertThat(((ExecuteCommandRequestBuilder) put.done()).sendAndAwait().getEvent().get(PROP_STATE)).isEqualTo("COMPLETED");
    }

    public Stream<SubscribedEvent> receiveEvents(Predicate<SubscribedEvent> predicate) {
        ensureOpenTopicSubscription();
        return this.apiRule.moveMessageStreamToHead().subscribedEvents().filter(subscribedEvent -> {
            return subscribedEvent.subscriptionType() == SubscriptionType.TOPIC_SUBSCRIPTION;
        }).filter(predicate);
    }

    public SubscribedEvent receiveSingleEvent(Predicate<SubscribedEvent> predicate) {
        return receiveEvents(predicate).findFirst().orElseThrow(() -> {
            return new AssertionError("no event received");
        });
    }

    private void ensureOpenTopicSubscription() {
        if (this.isTopicSubscriptionOpen) {
            return;
        }
        Assertions.assertThat(this.apiRule.openTopicSubscription(this.topicName, this.partitionId, "test", 0L).await().key()).isGreaterThanOrEqualTo(0L);
        this.isTopicSubscriptionOpen = true;
    }

    public static Predicate<SubscribedEvent> state(String str) {
        return subscribedEvent -> {
            return subscribedEvent.event().get(PROP_STATE).equals(str);
        };
    }

    public static Predicate<SubscribedEvent> workflowInstanceKey(long j) {
        return subscribedEvent -> {
            return subscribedEvent.event().get(PROP_WORKFLOW_INSTANCE_KEY).equals(Long.valueOf(j));
        };
    }

    public static Predicate<SubscribedEvent> workflowInstanceEvents() {
        return subscribedEvent -> {
            return subscribedEvent.eventType() == EventType.WORKFLOW_INSTANCE_EVENT;
        };
    }

    public static Predicate<SubscribedEvent> workflowInstanceEvents(String str) {
        return workflowInstanceEvents().and(state(str));
    }

    public static Predicate<SubscribedEvent> workflowInstanceEvents(String str, long j) {
        return workflowInstanceEvents(str).and(workflowInstanceKey(j));
    }

    public static Predicate<SubscribedEvent> taskEvents() {
        return subscribedEvent -> {
            return subscribedEvent.eventType() == EventType.TASK_EVENT;
        };
    }

    public static Predicate<SubscribedEvent> taskEvents(String str) {
        return taskEvents().and(state(str));
    }

    public static Predicate<SubscribedEvent> taskType(String str) {
        return subscribedEvent -> {
            return str.equals(subscribedEvent.event().get("type"));
        };
    }

    public static Predicate<SubscribedEvent> incidentEvents() {
        return subscribedEvent -> {
            return subscribedEvent.eventType() == EventType.INCIDENT_EVENT;
        };
    }

    public static Predicate<SubscribedEvent> incidentEvents(String str) {
        return incidentEvents().and(state(str));
    }

    public static Predicate<SubscribedEvent> workflowEvents(String str) {
        return workflowEvents().and(state(str));
    }

    public static Predicate<SubscribedEvent> workflowEvents() {
        return subscribedEvent -> {
            return subscribedEvent.eventType() == EventType.WORKFLOW_EVENT;
        };
    }
}
