package ru.fix.stdlib.batching;

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.fix.aggregating.profiler.Profiler;

/* loaded from: input_file:ru/fix/stdlib/batching/BatchingManager.class */
public class BatchingManager<ConfigT, PayloadT, KeyT> implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(BatchingManager.class);
    private final Map<KeyT, Queue<Operation<PayloadT>>> pendingTableOperations = new ConcurrentHashMap();
    private final BatchingParameters batchingParameters;
    private final BatchingManagerThread<ConfigT, PayloadT, KeyT> batchingManagerThread;
    private final String batchManagerId;
    private final Profiler profiler;

    public BatchingManager(ConfigT configt, BatchTask<ConfigT, PayloadT, KeyT> batchTask, BatchingParameters batchingParameters, String str, Profiler profiler) {
        this.batchingParameters = batchingParameters;
        this.batchManagerId = str;
        this.profiler = profiler;
        profiler.attachIndicator(getIndicatorName(str), () -> {
            return Long.valueOf(this.pendingTableOperations.values().stream().mapToLong((v0) -> {
                return v0.size();
            }).sum());
        });
        this.batchingManagerThread = new BatchingManagerThread<>(configt, this.pendingTableOperations, batchingParameters, profiler, str, batchTask);
        this.batchingManagerThread.start();
    }

    private static String getIndicatorName(String str) {
        return "Batching.manager." + str;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.batchingManagerThread.shutdown();
        this.profiler.detachIndicator(getIndicatorName(this.batchManagerId));
    }

    public void enqueue(KeyT keyt, PayloadT payloadt) throws MaxPendingOperationExceededException, InterruptedException {
        log.trace("enqueue(key: {}, payload: {}", keyt, payloadt);
        Queue<Operation<PayloadT>> computeIfAbsent = this.pendingTableOperations.computeIfAbsent(keyt, obj -> {
            return new ConcurrentLinkedQueue();
        });
        Operation<PayloadT> operation = new Operation<>(payloadt, System.currentTimeMillis());
        if (isBatchLimitReached(computeIfAbsent)) {
            if (!this.batchingParameters.isBlockIfLimitExceeded()) {
                throw new MaxPendingOperationExceededException(String.format("Queue operations peaked, maxPendingOperations = %d", Integer.valueOf(this.batchingParameters.getMaxPendingOperations())));
            }
            while (isBatchLimitReached(computeIfAbsent)) {
                synchronized (computeIfAbsent) {
                    computeIfAbsent.wait();
                }
            }
        }
        computeIfAbsent.add(operation);
        if (log.isTraceEnabled()) {
            log.trace("Queue size for key {} is {}", keyt, Integer.valueOf(computeIfAbsent.size()));
        }
    }

    private boolean isBatchLimitReached(Queue<Operation<PayloadT>> queue) {
        return queue.size() >= this.batchingParameters.getMaxPendingOperations();
    }

    public BatchingManager<ConfigT, PayloadT, KeyT> setBatchThreads(int i) {
        try {
            this.batchingManagerThread.setThreadCount(i);
            return this;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    public BatchingManager<ConfigT, PayloadT, KeyT> setBatchSize(int i) {
        this.batchingParameters.setBatchSize(i);
        return this;
    }

    public BatchingManager<ConfigT, PayloadT, KeyT> setMaxPendingOperations(int i) {
        this.batchingParameters.setMaxPendingOperations(i);
        return this;
    }

    public BatchingManager<ConfigT, PayloadT, KeyT> setBatchTimeout(int i) {
        this.batchingParameters.setBatchTimeout(i);
        return this;
    }
}
