package io.zeebe.broker.job;

import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.value.job.Headers;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.clientapi.VarDataEncodingEncoder;
import io.zeebe.protocol.impl.record.value.deployment.ResourceType;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.JobBatchIntent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandResponse;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.TestUtil;
import io.zeebe.test.util.record.RecordingExporter;
import io.zeebe.util.sched.clock.ControlledActorClock;
import java.io.ByteArrayOutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import one.util.streamex.StreamEx;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.assertj.core.internal.bytebuddy.utility.RandomString;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/job/ActivateJobsTest.class */
public class ActivateJobsTest {
    public static final String JOB_TYPE = "theJobType";
    public static final String PROCESS_ID = "testProcess";
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    public static final String JSON_PAYLOAD = "{\"foo\": \"bar\"}";
    public static final byte[] PAYLOAD_MSG_PACK = MsgPackUtil.asMsgPackReturnArray(JSON_PAYLOAD);
    public static final String LONG_CUSTOM_HEADER_VALUE = RandomString.make(128);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/broker/job/ActivateJobsTest$Job.class */
    public static class Job {
        final long key;
        final Map<String, Object> value;

        Job(long j, Map<String, Object> map) {
            this.key = j;
            this.value = map;
        }

        public long getKey() {
            return this.key;
        }

        public Map<String, Object> getValue() {
            return this.value;
        }
    }

    public ActivateJobsTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Test
    public void shouldRejectInvalidAmount() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB_BATCH, JobBatchIntent.ACTIVATE).command().put("type", JOB_TYPE).put("worker", "testWorker").put("timeout", Long.valueOf(Duration.ofSeconds(10L).toMillis())).put("retries", 3).put("amount", 0).put("jobs", Collections.emptyList()).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getRejectionType()).isEqualTo(RejectionType.INVALID_ARGUMENT);
        Assertions.assertThat(sendAndAwait.getRejectionReason()).isEqualTo("Expected to activate job batch with amount to be greater than zero, but it was '0'");
    }

    @Test
    public void shouldRejectInvalidTimeout() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB_BATCH, JobBatchIntent.ACTIVATE).command().put("type", JOB_TYPE).put("worker", "testWorker").put("timeout", Long.valueOf(Duration.ofSeconds(0L).toMillis())).put("retries", 3).put("amount", 3).put("jobs", Collections.emptyList()).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getRejectionType()).isEqualTo(RejectionType.INVALID_ARGUMENT);
        Assertions.assertThat(sendAndAwait.getRejectionReason()).isEqualTo("Expected to activate job batch with timeout to be greater than zero, but it was '0'");
    }

    @Test
    public void shouldRejectInvalidType() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB_BATCH, JobBatchIntent.ACTIVATE).command().put("type", "").put("worker", "testWorker").put("timeout", Long.valueOf(Duration.ofSeconds(10L).toMillis())).put("retries", 3).put("amount", 3).put("jobs", Collections.emptyList()).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getRejectionType()).isEqualTo(RejectionType.INVALID_ARGUMENT);
        Assertions.assertThat(sendAndAwait.getRejectionReason()).isEqualTo("Expected to activate job batch with type to be present, but it was blank");
    }

    @Test
    public void shouldRejectInvalidWorker() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB_BATCH, JobBatchIntent.ACTIVATE).command().put("type", JOB_TYPE).put("worker", "").put("timeout", Long.valueOf(Duration.ofSeconds(10L).toMillis())).put("retries", 3).put("amount", 3).put("jobs", Collections.emptyList()).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getRejectionType()).isEqualTo(RejectionType.INVALID_ARGUMENT);
        Assertions.assertThat(sendAndAwait.getRejectionReason()).isEqualTo("Expected to activate job batch with worker to be present, but it was blank");
    }

    @Test
    public void shouldActivateSingleJob() {
        ControlledActorClock clock = this.brokerRule.getClock();
        clock.pinCurrentTime();
        Long l = createJobs(JOB_TYPE, 3).get(0);
        Duration ofMinutes = Duration.ofMinutes(12L);
        Instant plusMillis = clock.getCurrentTime().plusMillis(ofMinutes.toMillis());
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB_BATCH, JobBatchIntent.ACTIVATE).command().put("type", JOB_TYPE).put("worker", "myTestWorker").put("timeout", Long.valueOf(ofMinutes.toMillis())).put("amount", 1).put("jobs", Collections.emptyList()).done()).sendAndAwait();
        List list = (List) sendAndAwait.getValue().get("jobKeys");
        List list2 = (List) sendAndAwait.getValue().get("jobs");
        Assertions.assertThat(sendAndAwait.getIntent()).isEqualTo(JobBatchIntent.ACTIVATED);
        Assertions.assertThat(list).hasSize(1);
        Assertions.assertThat(list2).hasSize(1);
        Assertions.assertThat((Long) list.get(0)).isEqualTo(l);
        Assertions.assertThat((Map) list2.get(0)).contains(new Map.Entry[]{Assertions.entry("retries", 3L), Assertions.entry("worker", "myTestWorker"), Assertions.entry("deadline", Long.valueOf(plusMillis.toEpochMilli())), Assertions.entry("type", JOB_TYPE)});
        MsgPackUtil.assertEquality((byte[]) ((Map) list2.get(0)).get("payload"), JSON_PAYLOAD);
        Record record = (Record) RecordingExporter.jobRecords(JobIntent.ACTIVATED).getFirst();
        io.zeebe.exporter.record.Assertions.assertThat(record).hasKey(l.longValue());
        io.zeebe.exporter.record.Assertions.assertThat(record.getValue()).hasRetries(3).hasWorker("myTestWorker").hasDeadline(plusMillis);
        io.zeebe.exporter.record.Assertions.assertThat((Record) RecordingExporter.jobBatchRecords(JobBatchIntent.ACTIVATED).getFirst()).hasKey(sendAndAwait.getKey()).hasSourceRecordPosition(((Record) RecordingExporter.jobBatchRecords(JobBatchIntent.ACTIVATE).getFirst()).getPosition()).hasTimestamp(clock.getCurrentTime());
    }

    @Test
    public void shouldActivateJobBatch() {
        List<Long> subList = createJobs(5).subList(0, 3);
        Assertions.assertThat(activateJobs(3)).extracting((v0) -> {
            return v0.getKey();
        }).containsOnlyElementsOf(subList);
        Assertions.assertThat((List) RecordingExporter.jobRecords(JobIntent.ACTIVATED).limit(3L).collect(Collectors.toList())).extracting((v0) -> {
            return v0.getKey();
        }).containsOnlyElementsOf(subList);
    }

    @Test
    public void shouldActivateJobBatches() {
        List<Long> createJobs = createJobs(12);
        List<Long> subList = createJobs.subList(0, 3);
        List<Long> subList2 = createJobs.subList(3, 7);
        List<Long> subList3 = createJobs.subList(7, 10);
        List<Job> activateJobs = activateJobs(3);
        List<Job> activateJobs2 = activateJobs(4);
        List<Job> activateJobs3 = activateJobs(3);
        Assertions.assertThat(activateJobs).extracting((v0) -> {
            return v0.getKey();
        }).containsOnlyElementsOf(subList);
        Assertions.assertThat(activateJobs2).extracting((v0) -> {
            return v0.getKey();
        }).containsOnlyElementsOf(subList2);
        Assertions.assertThat(activateJobs3).extracting((v0) -> {
            return v0.getKey();
        }).containsOnlyElementsOf(subList3);
    }

    @Test
    public void shouldReturnEmptyBatchIfNotJobsAvailable() {
        Assertions.assertThat(activateJobs(3)).isEmpty();
    }

    @Test
    public void shouldCompleteActivatedJobs() {
        List<Long> createJobs = createJobs(5);
        activateJobs(5).forEach(this::completeJob);
        Assertions.assertThat((List) RecordingExporter.jobRecords(JobIntent.COMPLETED).limit(5L).collect(Collectors.toList())).extracting((v0) -> {
            return v0.getKey();
        }).containsOnlyElementsOf(createJobs);
    }

    @Test
    public void shouldOnlyReturnJobsOfCorrectType() {
        List<Long> createJobs = createJobs(JOB_TYPE, 3);
        createJobs("differenttheJobType", 5);
        createJobs.addAll(createJobs(JOB_TYPE, 4));
        Assertions.assertThat(activateJobs(JOB_TYPE, 7)).extracting((v0) -> {
            return v0.getKey();
        }).containsOnlyElementsOf(createJobs);
        List list = (List) RecordingExporter.jobRecords(JobIntent.ACTIVATED).limit(createJobs.size()).collect(Collectors.toList());
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.getKey();
        }).containsOnlyElementsOf(createJobs);
        Assertions.assertThat(list).extracting("value.type").containsOnly(new Object[]{JOB_TYPE});
    }

    @Test
    public void shouldActivateJobsFromWorkflow() {
        deployWorkflow("foo", TestJarExporter.FOO, "baz");
        List<Long> createWorkflowInstances = createWorkflowInstances(10);
        TestUtil.waitUntil(() -> {
            return RecordingExporter.jobRecords(JobIntent.CREATED).withType("foo").limit(10L).count() == 10;
        });
        activateJobs("foo", 10).forEach(this::completeJob);
        TestUtil.waitUntil(() -> {
            return RecordingExporter.jobRecords(JobIntent.CREATED).withType(TestJarExporter.FOO).limit(10L).count() == 10;
        });
        activateJobs(TestJarExporter.FOO, 10).forEach(this::completeJob);
        TestUtil.waitUntil(() -> {
            return RecordingExporter.jobRecords(JobIntent.CREATED).withType("baz").limit(10L).count() == 10;
        });
        activateJobs("baz", 10).forEach(this::completeJob);
        Assertions.assertThat((List) RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_COMPLETED).filter(record -> {
            return record.getKey() == record.getValue().getWorkflowInstanceKey();
        }).limit(10L).collect(Collectors.toList())).extracting(record2 -> {
            return Long.valueOf(record2.getValue().getWorkflowInstanceKey());
        }).containsOnlyElementsOf(createWorkflowInstances);
    }

    @Test
    public void shouldActivateJobsWithLongCustomHeaders() {
        this.apiRule.partitionClient().deployWithResponse(Bpmn.convertToString(Bpmn.createExecutableProcess("processId").startEvent().serviceTask("task", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("taskType").zeebeTaskHeader("foo", LONG_CUSTOM_HEADER_VALUE);
        }).endEvent().done()).getBytes());
        this.apiRule.partitionClient().createWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("processId");
        });
        this.apiRule.partitionClient().completeJobOfType("taskType");
        Assertions.assertThat(((Record) RecordingExporter.jobRecords(JobIntent.ACTIVATED).limit(1L).getFirst()).getValue().getCustomHeaders().get("foo")).isEqualTo(LONG_CUSTOM_HEADER_VALUE);
    }

    @Test
    public void shouldFetchFullJobRecordFromWorkflow() {
        ControlledActorClock clock = this.brokerRule.getClock();
        clock.pinCurrentTime();
        Duration ofMinutes = Duration.ofMinutes(4L);
        Instant plusMillis = clock.getCurrentTime().plusMillis(ofMinutes.toMillis());
        deployWorkflow(JOB_TYPE);
        createWorkflowInstance(PROCESS_ID);
        Record record = (Record) RecordingExporter.jobRecords(JobIntent.CREATED).withType(JOB_TYPE).getFirst();
        Map<String, Object> value = activateJobs(JOB_TYPE, "testWorker", ofMinutes, 1).get(0).getValue();
        Assertions.assertThat(value).contains(new Map.Entry[]{Assertions.entry("type", JOB_TYPE), Assertions.entry("worker", "testWorker"), Assertions.entry("retries", 3L), Assertions.entry("deadline", Long.valueOf(plusMillis.toEpochMilli()))});
        MsgPackUtil.assertEquality((byte[]) value.get("payload"), "{'foo': 'bar'}");
        Map map = (Map) value.get("headers");
        Headers headers = record.getValue().getHeaders();
        Assertions.assertThat(map).contains(new Map.Entry[]{Assertions.entry("bpmnProcessId", headers.getBpmnProcessId()), Assertions.entry("workflowDefinitionVersion", Long.valueOf(headers.getWorkflowDefinitionVersion())), Assertions.entry("workflowKey", Long.valueOf(headers.getWorkflowKey())), Assertions.entry("workflowInstanceKey", Long.valueOf(headers.getWorkflowInstanceKey())), Assertions.entry("elementId", headers.getElementId()), Assertions.entry("elementInstanceKey", Long.valueOf(headers.getElementInstanceKey()))});
        Assertions.assertThat((Map) value.get("customHeaders")).isEqualTo(record.getValue().getCustomHeaders());
    }

    @Test
    public void shouldLimitJobsInBatch() {
        createJobs(JOB_TYPE, 3, "{\"key\": \"" + RandomString.make(VarDataEncodingEncoder.lengthMaxValue() / 3) + "\"}");
        Assertions.assertThat(activateJobs(JOB_TYPE, 3)).hasSize(2);
        Assertions.assertThat(activateJobs(JOB_TYPE, 1)).hasSize(1);
    }

    private List<Long> createJobs(int i) {
        return createJobs(JOB_TYPE, i);
    }

    private List<Long> createJobs(String str, int i) {
        return createJobs(str, i, JSON_PAYLOAD);
    }

    private List<Long> createJobs(String str, int i, String str2) {
        return (List) IntStream.range(0, i).boxed().map(num -> {
            return Long.valueOf(createJob(str, str2));
        }).collect(Collectors.toList());
    }

    private long createJob(String str, String str2) {
        return this.apiRule.partitionClient().createJob(str, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskRetries(3);
        }, str2);
    }

    private List<Job> activateJobs(int i) {
        return activateJobs(JOB_TYPE, i);
    }

    private List<Job> activateJobs(String str, int i) {
        return activateJobs(str, "testWorker", Duration.ofMinutes(5L), i);
    }

    private List<Job> activateJobs(String str, String str2, Duration duration, int i) {
        Map value = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB_BATCH, JobBatchIntent.ACTIVATE).command().put("type", str).put("worker", str2).put("timeout", Long.valueOf(duration.toMillis())).put("amount", Integer.valueOf(i)).put("jobs", Collections.emptyList()).done()).sendAndAwait().getValue();
        return (List) StreamEx.zip((List) value.get("jobKeys"), (List) value.get("jobs"), (v1, v2) -> {
            return new Job(v1, v2);
        }).collect(Collectors.toList());
    }

    private long completeJob(Job job) {
        return ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.COMPLETE).key(job.key).command().putAll(job.value).done()).sendAndAwait().getKey();
    }

    private long deployWorkflow(String... strArr) {
        ServiceTaskBuilder startEvent = Bpmn.createExecutableProcess(PROCESS_ID).startEvent();
        for (String str : strArr) {
            startEvent = startEvent.serviceTask(str, serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeTaskType(str).zeebeTaskRetries(3).zeebeTaskHeader("model", "true");
            });
        }
        return ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().partitionId(0).type(ValueType.DEPLOYMENT, DeploymentIntent.CREATE).command().put("resources", Collections.singletonList(deploymentResource(bpmnXml(startEvent.endEvent().done()), "process.bpmn"))).done()).sendAndAwait().getKey();
    }

    private Map<String, Object> deploymentResource(byte[] bArr, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("resource", bArr);
        hashMap.put("resourceType", ResourceType.BPMN_XML);
        hashMap.put("resourceName", str);
        return hashMap;
    }

    private byte[] bpmnXml(BpmnModelInstance bpmnModelInstance) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Bpmn.writeModelToStream(byteArrayOutputStream, bpmnModelInstance);
        return byteArrayOutputStream.toByteArray();
    }

    private List<Long> createWorkflowInstances(int i) {
        return (List) Stream.generate(() -> {
            return PROCESS_ID;
        }).limit(i).map(this::createWorkflowInstance).collect(Collectors.toList());
    }

    private long createWorkflowInstance(String str) {
        return this.apiRule.partitionClient().createWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId(str).setVariables(new UnsafeBuffer(PAYLOAD_MSG_PACK));
        }).getInstanceKey();
    }
}
