package io.camunda.zeebe;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.ZeebeClientBuilder;
import io.camunda.zeebe.config.AppCfg;
import io.camunda.zeebe.config.StarterCfg;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import io.grpc.ClientInterceptor;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/Starter.class */
public class Starter extends App {
    private static final Logger THROTTLED_LOGGER = new ThrottledLogger(LoggerFactory.getLogger(Starter.class), Duration.ofSeconds(5));
    private static final Logger LOG = LoggerFactory.getLogger(Starter.class);
    private final AppCfg appCfg;

    Starter(AppCfg appCfg) {
        this.appCfg = appCfg;
    }

    @Override // java.lang.Runnable
    public void run() {
        StarterCfg starter = this.appCfg.getStarter();
        int rate = starter.getRate();
        String processId = starter.getProcessId();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(5000);
        ZeebeClient createZeebeClient = createZeebeClient();
        printTopology(createZeebeClient);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(starter.getThreads());
        deployProcess(createZeebeClient, starter.getBpmnXmlPath());
        int floorDiv = Math.floorDiv(1000, rate);
        LOG.info("Creating an instance every {}ms", Integer.valueOf(floorDiv));
        String readVariables = readVariables(starter.getPayloadPath());
        BooleanSupplier createContinuationCondition = createContinuationCondition(starter);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ScheduledFuture<?> scheduleAtFixedRate = newScheduledThreadPool.scheduleAtFixedRate(() -> {
            runStarter(starter, processId, arrayBlockingQueue, createZeebeClient, readVariables, createContinuationCondition, countDownLatch);
        }, 0L, floorDiv, TimeUnit.MILLISECONDS);
        ResponseChecker responseChecker = new ResponseChecker(arrayBlockingQueue);
        responseChecker.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (!newScheduledThreadPool.isShutdown()) {
                newScheduledThreadPool.shutdown();
                try {
                    newScheduledThreadPool.awaitTermination(60L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    LOG.error("Shutdown executor service was interrupted", e);
                }
            }
            if (responseChecker.isAlive()) {
                responseChecker.close();
            }
        }));
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            LOG.error("Awaiting of count down latch was interrupted.", e);
        }
        LOG.info("Starter finished");
        scheduleAtFixedRate.cancel(true);
        newScheduledThreadPool.shutdown();
        responseChecker.close();
    }

    private void runStarter(StarterCfg starterCfg, String str, BlockingQueue<Future<?>> blockingQueue, ZeebeClient zeebeClient, String str2, BooleanSupplier booleanSupplier, CountDownLatch countDownLatch) {
        if (!booleanSupplier.getAsBoolean()) {
            countDownLatch.countDown();
            return;
        }
        try {
            if (starterCfg.isStartViaMessage()) {
                blockingQueue.put(zeebeClient.newPublishMessageCommand().messageName(starterCfg.getMsgName()).correlationKey(UUID.randomUUID().toString()).variables(str2).timeToLive(Duration.ZERO).send());
            } else {
                startViaCommand(starterCfg, str, blockingQueue, zeebeClient, str2);
            }
        } catch (Exception e) {
            THROTTLED_LOGGER.error("Error on creating new process instance", e);
        }
    }

    private static void startViaCommand(StarterCfg starterCfg, String str, BlockingQueue<Future<?>> blockingQueue, ZeebeClient zeebeClient, String str2) throws InterruptedException {
        if (starterCfg.isWithResults()) {
            blockingQueue.put(zeebeClient.newCreateInstanceCommand().bpmnProcessId(str).latestVersion().variables(str2).withResult().requestTimeout(starterCfg.getWithResultsTimeout()).send());
        } else {
            blockingQueue.put(zeebeClient.newCreateInstanceCommand().bpmnProcessId(str).latestVersion().variables(str2).send());
        }
    }

    private ZeebeClient createZeebeClient() {
        ZeebeClientBuilder withInterceptors = ZeebeClient.newClientBuilder().gatewayAddress(this.appCfg.getBrokerUrl()).numJobWorkerExecutionThreads(0).withProperties(System.getProperties()).withInterceptors(new ClientInterceptor[]{monitoringInterceptor});
        if (!this.appCfg.isTls()) {
            withInterceptors.usePlaintext();
        }
        return withInterceptors.build();
    }

    private void deployProcess(ZeebeClient zeebeClient, String str) {
        while (true) {
            try {
                zeebeClient.newDeployResourceCommand().addResourceFromClasspath(str).send().join();
                return;
            } catch (Exception e) {
                THROTTLED_LOGGER.warn("Failed to deploy process, retrying", e);
                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    private BooleanSupplier createContinuationCondition(StarterCfg starterCfg) {
        int durationLimit = starterCfg.getDurationLimit();
        if (durationLimit <= 0) {
            return () -> {
                return true;
            };
        }
        LocalDateTime plus = LocalDateTime.now().plus(durationLimit, (TemporalUnit) ChronoUnit.SECONDS);
        return () -> {
            return LocalDateTime.now().isBefore(plus);
        };
    }

    public static void main(String[] strArr) {
        createApp(Starter::new);
    }
}
