package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.state.ZbColumnFamilies;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.ProcessExecutor;
import io.camunda.zeebe.engine.util.client.DeploymentClient;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.bpmn.random.ExecutionPath;
import io.camunda.zeebe.test.util.bpmn.random.ScheduledExecutionStep;
import io.camunda.zeebe.test.util.bpmn.random.TestDataGenerator;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/ReplayStateRandomizedPropertyTest.class */
public class ReplayStateRandomizedPropertyTest {
    private static final String PROCESS_COUNT = System.getProperty("processCount", "3");
    private static final String EXECUTION_PATH_COUNT = System.getProperty("replayExecutionCount", "1");

    @Parameterized.Parameter
    public TestDataGenerator.TestDataRecord record;

    @Rule
    public TestWatcher failedTestDataPrinter = new FailedPropertyBasedTestDataPrinter(this::getDataRecord);

    @Rule
    public final EngineRule engineRule = EngineRule.singlePartition();
    private long lastProcessedPosition = -1;
    private final ProcessExecutor processExecutor = new ProcessExecutor(this.engineRule);

    @Before
    public void init() {
        this.lastProcessedPosition = -1L;
    }

    public TestDataGenerator.TestDataRecord getDataRecord() {
        return this.record;
    }

    @Test
    public void shouldRestoreStateAtEachStepInExecution() {
        DeploymentClient deployment = this.engineRule.deployment();
        List bpmnModels = this.record.getBpmnModels();
        Objects.requireNonNull(deployment);
        bpmnModels.forEach(deployment::withXmlResource);
        deployment.deploy();
        ExecutionPath executionPath = this.record.getExecutionPath();
        for (ScheduledExecutionStep scheduledExecutionStep : executionPath.getSteps()) {
            this.record.setCurrentStep(scheduledExecutionStep);
            this.processExecutor.applyStep(scheduledExecutionStep.getStep());
            stopAndRestartEngineAndCompareStates();
        }
        long position = ((Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.PROCESS).withBpmnProcessId(executionPath.getProcessId()).getFirst()).getPosition();
        Awaitility.await("await the last process record to be processed").untilAsserted(() -> {
            Assertions.assertThat(this.engineRule.getLastProcessedPosition()).isGreaterThanOrEqualTo(position);
        });
        stopAndRestartEngineAndCompareStates();
    }

    private void stopAndRestartEngineAndCompareStates() {
        waitForProcessingToStop();
        this.engineRule.pauseProcessing(1);
        Map<ZbColumnFamilies, Map<Object, Object>> collectState = this.engineRule.collectState();
        this.engineRule.stop();
        this.engineRule.start();
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat((StreamProcessor.Phase) this.engineRule.getStreamProcessor(1).getCurrentPhase().join()).isEqualTo(StreamProcessor.Phase.PROCESSING);
        });
        Awaitility.await("await that the replay state is equal to the processing state").untilAsserted(() -> {
            Map<ZbColumnFamilies, Map<Object, Object>> collectState2 = this.engineRule.collectState();
            SoftAssertions softAssertions = new SoftAssertions();
            collectState.entrySet().stream().filter(entry -> {
                return entry.getKey() != ZbColumnFamilies.DEFAULT;
            }).forEach(entry2 -> {
                ZbColumnFamilies zbColumnFamilies = (ZbColumnFamilies) entry2.getKey();
                Map map = (Map) entry2.getValue();
                Map map2 = (Map) collectState2.get(zbColumnFamilies);
                if (map.isEmpty()) {
                    softAssertions.assertThat(map2).describedAs("The state column '%s' should be empty after replay", new Object[]{zbColumnFamilies}).isEmpty();
                } else {
                    softAssertions.assertThat(map2).describedAs("The state column '%s' has different entries after replay", new Object[]{zbColumnFamilies}).containsExactlyInAnyOrderEntriesOf(map);
                }
            });
            softAssertions.assertAll();
        });
    }

    private void waitForProcessingToStop() {
        Awaitility.await("await the last written record to be processed").untilAsserted(() -> {
            ((AbstractBooleanAssert) Assertions.assertThat(this.engineRule.hasReachedEnd()).describedAs("Processing has reached end of the log.", new Object[0])).isTrue();
        });
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<TestDataGenerator.TestDataRecord> getTestRecords() {
        return TestDataGenerator.generateTestRecords(Integer.parseInt(PROCESS_COUNT), Integer.parseInt(EXECUTION_PATH_COUNT));
    }
}
