package dev.responsive.kafka.internal.stores;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.BatchFlusher;
import dev.responsive.kafka.internal.db.KeySpec;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.StoreMetrics;
import dev.responsive.kafka.internal.utils.ExceptionSupplier;
import dev.responsive.kafka.internal.utils.Iterators;
import dev.responsive.kafka.internal.utils.Result;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.TableName;
import dev.responsive.kafka.internal.utils.Utils;
import java.io.Closeable;
import java.lang.Comparable;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/stores/CommitBuffer.class */
public class CommitBuffer<K extends Comparable<K>, P> implements RecordBatchingStateRestoreCallback, Closeable {
    public static final int MAX_BATCH_SIZE = 1000;
    private final Logger log;
    private final String logPrefix;
    private final BatchFlusher<K, P> batchFlusher;
    private final SizeTrackingBuffer<K> buffer;
    private final ResponsiveMetrics metrics;
    private final Admin admin;
    private final TopicPartition changelog;
    private final FlushTriggers flushTriggers;
    private final ExceptionSupplier exceptionSupplier;
    private final int maxBatchSize;
    private final Supplier<Instant> clock;
    private final KeySpec<K> keySpec;
    private final String flushSensorName;
    private final String flushLatencySensorName;
    private final String flushErrorsSensorName;
    private final MetricName lastFlushMetric;
    private final Sensor flushSensor;
    private final Sensor flushLatencySensor;
    private final Sensor flushErrorsSensor;
    private KafkaFuture<DeletedRecords> deleteRecordsFuture;
    private Instant lastFlush;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K extends Comparable<K>, P> CommitBuffer<K, P> from(BatchFlusher<K, P> batchFlusher, SessionClients sessionClients, TopicPartition topicPartition, KeySpec<K> keySpec, TableName tableName, ResponsiveConfig responsiveConfig) {
        return new CommitBuffer<>(batchFlusher, sessionClients, topicPartition, sessionClients.admin(), keySpec, tableName, FlushTriggers.fromConfig(responsiveConfig), ExceptionSupplier.fromConfig(responsiveConfig.originals()));
    }

    CommitBuffer(BatchFlusher<K, P> batchFlusher, SessionClients sessionClients, TopicPartition topicPartition, Admin admin, KeySpec<K> keySpec, TableName tableName, FlushTriggers flushTriggers, ExceptionSupplier exceptionSupplier) {
        this(batchFlusher, sessionClients, topicPartition, admin, keySpec, tableName, flushTriggers, exceptionSupplier, MAX_BATCH_SIZE, Instant::now);
    }

    CommitBuffer(BatchFlusher<K, P> batchFlusher, SessionClients sessionClients, TopicPartition topicPartition, Admin admin, KeySpec<K> keySpec, TableName tableName, FlushTriggers flushTriggers, ExceptionSupplier exceptionSupplier, int i, Supplier<Instant> supplier) {
        this.deleteRecordsFuture = KafkaFuture.completedFuture((Object) null);
        this.batchFlusher = batchFlusher;
        this.changelog = topicPartition;
        this.metrics = sessionClients.metrics();
        this.admin = admin;
        this.buffer = new SizeTrackingBuffer<>(keySpec);
        this.keySpec = keySpec;
        this.flushTriggers = flushTriggers;
        this.exceptionSupplier = exceptionSupplier;
        this.maxBatchSize = i;
        this.clock = supplier;
        this.lastFlush = supplier.get();
        this.logPrefix = String.format("commit-buffer [%s-%d] ", tableName.tableName(), Integer.valueOf(topicPartition.partition()));
        this.log = new LogContext(this.logPrefix).logger(CommitBuffer.class);
        String kafkaName = tableName.kafkaName();
        this.flushSensorName = getSensorName(StoreMetrics.FLUSH, topicPartition);
        this.flushLatencySensorName = getSensorName(StoreMetrics.FLUSH_LATENCY, topicPartition);
        this.flushErrorsSensorName = getSensorName(StoreMetrics.FLUSH_ERRORS, topicPartition);
        String extractThreadId = Utils.extractThreadId(Thread.currentThread().getName());
        this.lastFlushMetric = this.metrics.metricName(StoreMetrics.TIME_SINCE_LAST_FLUSH, StoreMetrics.TIME_SINCE_LAST_FLUSH_DESCRIPTION, this.metrics.storeLevelMetric(StoreMetrics.STORE_METRIC_GROUP, extractThreadId, topicPartition, kafkaName));
        this.metrics.addMetric(this.lastFlushMetric, (metricConfig, j) -> {
            return Long.valueOf(j - this.lastFlush.toEpochMilli());
        });
        this.flushSensor = this.metrics.addSensor(this.flushSensorName);
        this.flushSensor.add(this.metrics.metricName(StoreMetrics.FLUSH_RATE, StoreMetrics.FLUSH_RATE_DESCRIPTION, this.metrics.storeLevelMetric(StoreMetrics.STORE_METRIC_GROUP, extractThreadId, topicPartition, kafkaName)), new Rate());
        this.flushSensor.add(this.metrics.metricName(StoreMetrics.FLUSH_TOTAL, StoreMetrics.FLUSH_TOTAL_DESCRIPTION, this.metrics.storeLevelMetric(StoreMetrics.STORE_METRIC_GROUP, extractThreadId, topicPartition, kafkaName)), new CumulativeCount());
        this.flushLatencySensor = this.metrics.addSensor(this.flushLatencySensorName);
        this.flushLatencySensor.add(this.metrics.metricName(StoreMetrics.FLUSH_LATENCY_AVG, StoreMetrics.FLUSH_LATENCY_AVG_DESCRIPTION, this.metrics.storeLevelMetric(StoreMetrics.STORE_METRIC_GROUP, extractThreadId, topicPartition, kafkaName)), new Avg());
        this.flushLatencySensor.add(this.metrics.metricName(StoreMetrics.FLUSH_LATENCY_MAX, StoreMetrics.FLUSH_LATENCY_MAX_DESCRIPTION, this.metrics.storeLevelMetric(StoreMetrics.STORE_METRIC_GROUP, extractThreadId, topicPartition, kafkaName)), new Max());
        this.flushErrorsSensor = this.metrics.addSensor(this.flushErrorsSensorName);
        this.flushErrorsSensor.add(this.metrics.metricName(StoreMetrics.FLUSH_ERRORS_RATE, StoreMetrics.FLUSH_ERRORS_RATE_DESCRIPTION, this.metrics.storeLevelMetric(StoreMetrics.STORE_METRIC_GROUP, extractThreadId, topicPartition, kafkaName)), new Rate());
        this.flushErrorsSensor.add(this.metrics.metricName(StoreMetrics.FLUSH_ERRORS_TOTAL, StoreMetrics.FLUSH_ERRORS_TOTAL_DESCRIPTION, this.metrics.storeLevelMetric(StoreMetrics.STORE_METRIC_GROUP, extractThreadId, topicPartition, kafkaName)), new CumulativeCount());
    }

    private static String getSensorName(String str, TopicPartition topicPartition) {
        return str + "-" + topicPartition;
    }

    private static boolean hasSourceTopicChangelog(String str) {
        return !str.endsWith("changelog");
    }

    public void put(K k, byte[] bArr, long j) {
        this.buffer.put(k, Result.value(k, bArr, j));
    }

    public void tombstone(K k, long j) {
        this.buffer.put(k, Result.tombstone(k, j));
    }

    public Result<K> get(K k) {
        Result<K> result = (Result) this.buffer.getReader().get(k);
        if (result == null || !this.keySpec.retain(result.key)) {
            return null;
        }
        return result;
    }

    public Result<K> get(K k, Predicate<Result<K>> predicate) {
        Result<K> result = (Result) this.buffer.getReader().get(k);
        if (result != null && this.keySpec.retain(result.key) && predicate.test(result)) {
            return result;
        }
        return null;
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().subMap(k, true, k2, true).entrySet().iterator(), entry -> {
            return this.keySpec.retain((Comparable) entry.getKey());
        }), entry2 -> {
            return new KeyValue((Comparable) entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2, Predicate<Result<K>> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().subMap(k, true, k2, true).entrySet().iterator(), entry -> {
            return this.keySpec.retain((Comparable) entry.getKey()) && predicate.test((Result) entry.getValue());
        }), entry2 -> {
            return new KeyValue((Comparable) entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backRange(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().descendingMap().subMap(k2, k).entrySet().iterator(), entry -> {
            return this.keySpec.retain((Comparable) entry.getKey());
        }), entry2 -> {
            return new KeyValue((Comparable) entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all() {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().entrySet().iterator(), entry -> {
            return this.keySpec.retain((Comparable) entry.getKey());
        }), entry2 -> {
            return new KeyValue((Comparable) entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all(Predicate<Result<K>> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().entrySet().iterator(), entry -> {
            return this.keySpec.retain((Comparable) entry.getKey()) && predicate.test((Result) entry.getValue());
        }), entry2 -> {
            return new KeyValue((Comparable) entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backAll(Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.getReader().descendingMap().entrySet().iterator(), entry -> {
            return this.keySpec.retain((Comparable) entry.getKey()) && predicate.test((Comparable) entry.getKey());
        }), entry2 -> {
            return new KeyValue((Comparable) entry2.getKey(), (Result) entry2.getValue());
        });
    }

    private long totalBytesBuffered() {
        return this.buffer.getBytes();
    }

    private boolean triggerFlush() {
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        if (this.buffer.getReader().size() >= this.flushTriggers.getRecords()) {
            this.log.debug("Will flush due to records buffered {} over trigger {}", Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(this.flushTriggers.getRecords()));
            z = true;
        } else {
            this.log.debug("Records buffered since last flush {} not over trigger {}", Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(this.flushTriggers.getRecords()));
        }
        long j = totalBytesBuffered();
        if (j >= this.flushTriggers.getBytes()) {
            this.log.debug("Will flush due to bytes buffered {} over bytes trigger {}", Long.valueOf(j), Long.valueOf(this.flushTriggers.getBytes()));
            z2 = true;
        } else {
            this.log.debug("Bytes buffered since last flush {} not over trigger {}", Long.valueOf(j), Long.valueOf(this.flushTriggers.getBytes()));
        }
        Instant instant = this.clock.get();
        if (this.lastFlush.plus((TemporalAmount) this.flushTriggers.getInterval()).isBefore(instant)) {
            this.log.debug("Will flush as time since last flush {} over interval trigger {}", Duration.between(this.lastFlush, instant), instant);
            z3 = true;
        } else {
            this.log.debug("Time since last flush {} not over trigger {}", Duration.between(this.lastFlush, instant), instant);
        }
        return z || z2 || z3;
    }

    public void flush(long j) {
        if (triggerFlush()) {
            this.flushTriggers.reset();
            doFlush(j, this.maxBatchSize);
            this.lastFlush = this.clock.get();
        }
    }

    private void doFlush(long j, int i) {
        long nanoTime = System.nanoTime();
        this.log.info("Flushing {} records with batchSize={} to remote (offset={}, writer={})", new Object[]{Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(i), Long.valueOf(j), this.batchFlusher});
        BatchFlusher.FlushResult<K, P> flushWriteBatch = this.batchFlusher.flushWriteBatch(this.buffer.getReader(), j);
        if (!flushWriteBatch.result().wasApplied()) {
            P tablePartition = flushWriteBatch.result().tablePartition();
            this.log.warn("Error while flushing batch for table partition {}", tablePartition);
            this.flushErrorsSensor.record();
            String format = String.format("Failed table partition [%s]: %s", tablePartition, flushWriteBatch.failedFlushInfo(j, tablePartition));
            this.log.warn(format);
            throw this.exceptionSupplier.commitFencedException(this.logPrefix + format);
        }
        long nanoTime2 = System.nanoTime();
        long millis = TimeUnit.NANOSECONDS.toMillis(nanoTime2);
        long millis2 = TimeUnit.NANOSECONDS.toMillis(nanoTime2 - nanoTime);
        this.flushSensor.record(1.0d, millis);
        this.flushLatencySensor.record(millis2, millis);
        this.log.info("Flushed {} records to {} table partitions with offset {} in {}ms", new Object[]{Integer.valueOf(this.buffer.getReader().size()), Integer.valueOf(flushWriteBatch.numTablePartitionsFlushed()), Long.valueOf(j), Long.valueOf(millis2)});
        this.buffer.clear();
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        ArrayList arrayList = new ArrayList(this.maxBatchSize);
        Iterator<ConsumerRecord<byte[], byte[]>> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
            if (arrayList.size() >= this.maxBatchSize) {
                restoreCassandraBatch(arrayList);
                arrayList.clear();
            }
        }
        if (arrayList.size() > 0) {
            restoreCassandraBatch(arrayList);
        }
    }

    private void restoreCassandraBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        long j = -1;
        for (ConsumerRecord<byte[], byte[]> consumerRecord : collection) {
            j = consumerRecord.offset();
            if (consumerRecord.value() == null) {
                tombstone(this.keySpec.keyFromRecord(consumerRecord), consumerRecord.timestamp());
            } else {
                put(this.keySpec.keyFromRecord(consumerRecord), (byte[]) consumerRecord.value(), consumerRecord.timestamp());
            }
        }
        if (j >= 0) {
            doFlush(j, collection.size());
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.deleteRecordsFuture.cancel(true);
        this.metrics.removeMetric(this.lastFlushMetric);
        this.metrics.removeSensor(this.flushSensorName);
        this.metrics.removeSensor(this.flushLatencySensorName);
        this.metrics.removeSensor(this.flushErrorsSensorName);
    }
}
