package ru.fix.stdlib.batching;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.fix.aggregating.profiler.Profiler;
import ru.fix.dynamic.property.api.DynamicProperty;
import ru.fix.stdlib.concurrency.threads.NamedExecutors;
import ru.fix.stdlib.concurrency.threads.ProfiledThreadPoolExecutor;

/* loaded from: input_file:ru/fix/stdlib/batching/BatchingManagerThread.class */
class BatchingManagerThread<ConfigT, PayloadT, KeyT> implements Runnable {
    private static final String THREAD_INTERRUPTED_MESSAGE = "BatchProcessorManager thread interrupted";
    private final ExecutorService batchProcessorManagerExecutor;
    private final ProfiledThreadPoolExecutor batchProcessorPool;
    private final BatchingParameters batchingParameters;
    private final Map<KeyT, Queue<Operation<PayloadT>>> pendingTableOperations;
    private final Semaphore batchProcessorsTracker;
    private final ConfigT config;
    private final BatchTask<ConfigT, PayloadT, KeyT> batchTask;
    private final Profiler profiler;
    private final String batchManagerId;
    private final BatchingManagerMetricsProvider metricsProvider;
    private final Logger log = LoggerFactory.getLogger(BatchingManagerThread.class);
    private final AtomicBoolean isShutdown = new AtomicBoolean();
    private final Object waitForOperationLock = new Object();

    public BatchingManagerThread(ConfigT configt, Map<KeyT, Queue<Operation<PayloadT>>> map, BatchingParameters batchingParameters, Profiler profiler, String str, BatchTask<ConfigT, PayloadT, KeyT> batchTask) {
        this.batchProcessorPool = NamedExecutors.newDynamicPool("batching-manager-" + str, DynamicProperty.of(Integer.valueOf(batchingParameters.getBatchThreads())), profiler);
        this.batchProcessorsTracker = new Semaphore(batchingParameters.getBatchThreads());
        this.pendingTableOperations = map;
        this.batchingParameters = batchingParameters;
        this.config = configt;
        this.profiler = profiler;
        this.batchManagerId = str;
        this.batchTask = batchTask;
        this.batchProcessorManagerExecutor = NamedExecutors.newSingleThreadPool("BatchingManagerThread_ " + str, profiler);
        this.metricsProvider = new BatchingManagerMetricsProvider(str, profiler);
    }

    private static void shutdownAndAwaitTermination(ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(10L, TimeUnit.MINUTES)) {
                executorService.shutdownNow();
                executorService.awaitTermination(1L, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public void shutdown() {
        if (!this.isShutdown.compareAndSet(false, true)) {
            this.log.debug("already shutdown");
        }
        shutdownAndAwaitTermination(this.batchProcessorManagerExecutor);
        shutdownAndAwaitTermination(this.batchProcessorPool);
    }

    public void start() {
        this.batchProcessorManagerExecutor.execute(this);
        this.log.trace("BatchProcessorManagerThread executor started");
    }

    @Override // java.lang.Runnable
    public void run() {
        this.log.trace("BatchProcessorManager thread started.");
        while (!this.isShutdown.get()) {
            this.log.trace("Check if thread interrupted");
            if (Thread.currentThread().isInterrupted()) {
                this.log.trace(THREAD_INTERRUPTED_MESSAGE);
                return;
            }
            if (this.metricsProvider.profileBatchProcessorAwaitThread(this::waitForAvailableBatchProcessThread).booleanValue()) {
                return;
            }
            this.log.trace("Wait for available operations and copy them to local buffer.");
            List<Operation<PayloadT>> list = null;
            KeyT keyt = null;
            while (list == null && !this.isShutdown.get()) {
                Map.Entry<KeyT, Queue<Operation<PayloadT>>> takeAnyQueueReadyForProcessing = takeAnyQueueReadyForProcessing();
                if (takeAnyQueueReadyForProcessing != null) {
                    keyt = takeAnyQueueReadyForProcessing.getKey();
                    list = prepareBuffer(takeAnyQueueReadyForProcessing);
                } else if (this.metricsProvider.profileBatchManagerThreadAwaitOperation(this::calculateTimeForSleepAndWaitForOperations).booleanValue()) {
                    return;
                }
            }
            if (!this.isShutdown.get() && list != null) {
                this.log.trace("Start batch processor");
                executeBatchForBuffer(list, keyt);
            }
        }
        this.metricsProvider.detachOperationQueueSizeIndicators();
        this.log.trace("BatchProcessorManager thread stopped.");
    }

    private void executeBatchForBuffer(List<Operation<PayloadT>> list, KeyT keyt) {
        this.batchProcessorPool.execute(new BatchProcessor(this.config, list, this.batchProcessorsTracker, this.batchTask, keyt, this.batchManagerId, this.profiler));
    }

    private boolean waitForAvailableBatchProcessThread() {
        this.log.trace("Wait available thread in BatchProcessor thread pool");
        boolean z = false;
        while (!this.isShutdown.get() && !z) {
            try {
                z = this.batchProcessorsTracker.tryAcquire(100L, TimeUnit.MILLISECONDS);
                if (z) {
                    this.log.trace("isBatchProcessorThreadAvailable");
                    this.batchProcessorsTracker.release();
                }
            } catch (InterruptedException e) {
                this.log.trace(THREAD_INTERRUPTED_MESSAGE, e);
                Thread.currentThread().interrupt();
                return true;
            }
        }
        return false;
    }

    @Nullable
    private Map.Entry<KeyT, Queue<Operation<PayloadT>>> takeAnyQueueReadyForProcessing() {
        return this.pendingTableOperations.entrySet().stream().peek(entry -> {
            this.metricsProvider.createOperationsQueueSizeIndicatorIfNeeded(entry.getKey().toString(), (Queue) entry.getValue());
        }).filter(entry2 -> {
            return isBatchFull((Queue) entry2.getValue()) || isTimeoutExpired((Queue) entry2.getValue());
        }).findAny().orElse(null);
    }

    private boolean calculateTimeForSleepAndWaitForOperations() {
        long batchTimeout;
        this.log.trace("calculate time to sleep");
        Optional<Long> oldestOperationCreationTimeStamp = getOldestOperationCreationTimeStamp();
        if (oldestOperationCreationTimeStamp.isPresent()) {
            batchTimeout = this.batchingParameters.getBatchTimeout() - (System.currentTimeMillis() - oldestOperationCreationTimeStamp.get().longValue());
            if (batchTimeout < 0) {
                batchTimeout = 1;
            }
        } else {
            batchTimeout = this.batchingParameters.getBatchTimeout();
        }
        this.log.trace("queue is empty, wait {}ms", Long.valueOf(batchTimeout));
        synchronized (this.waitForOperationLock) {
            if (batchTimeout <= 0) {
                batchTimeout = 1;
            }
            try {
                this.waitForOperationLock.wait(batchTimeout);
                this.log.trace("leave wait section");
            } catch (InterruptedException e) {
                this.log.trace(THREAD_INTERRUPTED_MESSAGE, e);
                this.metricsProvider.detachOperationQueueSizeIndicators();
                Thread.currentThread().interrupt();
                return true;
            }
        }
        return false;
    }

    @NotNull
    private Optional<Long> getOldestOperationCreationTimeStamp() {
        return this.pendingTableOperations.values().stream().map(queue -> {
            return Optional.ofNullable((Operation) queue.peek());
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).map((v0) -> {
            return v0.getCreationTimestamp();
        }).min((v0, v1) -> {
            return v0.compareTo(v1);
        });
    }

    private List<Operation<PayloadT>> prepareBuffer(Map.Entry<KeyT, Queue<Operation<PayloadT>>> entry) {
        KeyT key = entry.getKey();
        Queue<Operation<PayloadT>> value = entry.getValue();
        profileTimeSpentInQueueForTable(key, value);
        Objects.requireNonNull(value);
        List<Operation<PayloadT>> list = (List) Stream.generate(value::poll).limit(getBatchSize()).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        if (this.batchingParameters.isBlockIfLimitExceeded()) {
            synchronized (value) {
                value.notifyAll();
            }
        }
        this.log.trace("Prepared buffer for \"{}\" table, buffer size {}, queue size {}", new Object[]{key, Integer.valueOf(list.size()), Integer.valueOf(value.size())});
        return list;
    }

    private void profileTimeSpentInQueueForTable(KeyT keyt, Queue<Operation<PayloadT>> queue) {
        Operation<PayloadT> peek = queue.peek();
        if (peek != null) {
            this.metricsProvider.profileTimeOperationSpentInQueue(peek.getCreationTimestamp(), keyt.toString());
        }
    }

    private int getBatchSize() {
        return this.batchingParameters.getBatchSize();
    }

    private boolean isBatchFull(Queue<Operation<PayloadT>> queue) {
        return queue.size() >= this.batchingParameters.getBatchSize();
    }

    private boolean isTimeoutExpired(Queue<Operation<PayloadT>> queue) {
        Operation<PayloadT> peek = queue.peek();
        return peek != null && System.currentTimeMillis() - peek.getCreationTimestamp() > ((long) this.batchingParameters.getBatchTimeout());
    }

    public void setThreadCount(int i) throws InterruptedException {
        synchronized (this.batchProcessorPool) {
            this.batchingParameters.setBatchThreads(i);
            this.batchProcessorPool.setMaxPoolSize(i);
        }
    }
}
