package dev.responsive.kafka.store;

import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import dev.responsive.db.CassandraClient;
import dev.responsive.model.Result;
import dev.responsive.utils.Iterators;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:dev/responsive/kafka/store/CommitBuffer.class */
public class CommitBuffer<K> implements RecordBatchingStateRestoreCallback {
    public static final int MAX_BATCH_SIZE = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(CommitBuffer.class);
    private final NavigableMap<K, Result<K>> buffer;
    private final CassandraClient client;
    private final String tableName;
    private final int partition;
    private final Admin admin;
    private final RecordCollector.Supplier recordCollector;
    private final TopicPartition changelog;
    private final BufferPlugin<K> plugin;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:dev/responsive/kafka/store/CommitBuffer$BufferPlugin.class */
    public interface BufferPlugin<K> extends Comparator<K> {
        K keyFromRecord(ConsumerRecord<byte[], byte[]> consumerRecord);

        BoundStatement insertData(CassandraClient cassandraClient, String str, int i, K k, byte[] bArr);

        BoundStatement deleteData(CassandraClient cassandraClient, String str, int i, K k);

        default boolean retain(K k) {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CommitBuffer(CassandraClient cassandraClient, String str, TopicPartition topicPartition, RecordCollector.Supplier supplier, Admin admin, BufferPlugin<K> bufferPlugin) {
        this.client = cassandraClient;
        this.tableName = str;
        this.recordCollector = supplier;
        this.changelog = topicPartition;
        this.partition = topicPartition.partition();
        this.admin = admin;
        this.plugin = bufferPlugin;
        this.buffer = new TreeMap(bufferPlugin);
    }

    public void put(K k, byte[] bArr) {
        this.buffer.put(k, Result.value(k, bArr));
    }

    public void tombstone(K k) {
        this.buffer.put(k, Result.tombstone(k));
    }

    public Result<K> get(K k) {
        Result<K> result = (Result) this.buffer.get(k);
        if (result == null || !this.plugin.retain(result.key)) {
            return null;
        }
        return result;
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.subMap(k, k2).entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> range(K k, K k2, Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.subMap(k, k2).entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backRange(K k, K k2) {
        return Iterators.kv(Iterators.filter(this.buffer.descendingMap().subMap(k2, k).entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all() {
        return Iterators.kv(Iterators.filter(this.buffer.entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> all(Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    public KeyValueIterator<K, Result<K>> backAll(Predicate<K> predicate) {
        return Iterators.kv(Iterators.filter(this.buffer.descendingMap().entrySet().iterator(), entry -> {
            return this.plugin.retain(entry.getKey()) && predicate.test(entry.getKey());
        }), entry2 -> {
            return new KeyValue(entry2.getKey(), (Result) entry2.getValue());
        });
    }

    int size() {
        return this.buffer.size();
    }

    public void flush() {
        if (this.buffer.isEmpty()) {
            LOG.info("Ignoring flush() to empty commit buffer for {}[{}]", this.tableName, Integer.valueOf(this.partition));
            return;
        }
        RecordCollector recordCollector = this.recordCollector.recordCollector();
        if (recordCollector == null) {
            throw new IllegalStateException("Unexpected null record collector for " + this.tableName + "[" + this.partition + "]");
        }
        Long l = (Long) recordCollector.offsets().get(this.changelog);
        if (l == null) {
            throw new IllegalStateException("Unexpected state: buffer is non-empty but no write has gone to changelog");
        }
        UUID randomUUID = UUID.randomUUID();
        if (flush(l.longValue(), randomUUID)) {
            return;
        }
        CassandraClient.OffsetRow offset = this.client.getOffset(this.tableName, this.partition);
        String str = this.tableName;
        int i = this.partition;
        long j = offset.offset;
        UUID uuid = offset.txind;
        ProcessorStateException processorStateException = new ProcessorStateException("Failure to write batch to " + str + "[" + i + "] with end offset " + l + " and stored offset " + j + ". If the stored offset is larger than the end offset it is likely that this client was fenced by a more up to date consumer. txnId: " + processorStateException + " and stored txnId: " + randomUUID);
        throw processorStateException;
    }

    private boolean flush(long j, UUID uuid) {
        LOG.info("Flushing {} records to remote {}[{}] (offset={}, transactionId={})", new Object[]{Integer.valueOf(this.buffer.size()), this.tableName, Integer.valueOf(this.partition), Long.valueOf(j), uuid});
        Iterator<Map.Entry<K, Result<K>>> it = this.buffer.entrySet().iterator();
        boolean z = true;
        while (it.hasNext()) {
            BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
            if (uuid != null) {
                batchStatementBuilder.addStatement(this.client.acquirePermit(this.tableName, this.partition, z ? CassandraClient.UNSET_PERMIT : uuid, uuid, j));
                z = false;
            } else {
                batchStatementBuilder.addStatement(this.client.revokePermit(this.tableName, this.partition, j));
            }
            for (int i = 0; i < 1000 && it.hasNext(); i++) {
                Result<K> value = it.next().getValue();
                if (value.isTombstone || this.plugin.retain(value.key)) {
                    batchStatementBuilder.addStatement(value.isTombstone ? this.plugin.deleteData(this.client, this.tableName, this.partition, value.key) : this.plugin.insertData(this.client, this.tableName, this.partition, value.key, value.value));
                }
            }
            batchStatementBuilder.setIdempotence(true);
            if (!this.client.execute(batchStatementBuilder.build()).wasApplied()) {
                return false;
            }
        }
        if (uuid != null && !this.client.execute(this.client.finalizeTxn(this.tableName, this.partition, uuid, j)).wasApplied()) {
            return false;
        }
        LOG.info("Completed flushing {} records to remote {}[{}] (offset={}, transactionId={})", new Object[]{Integer.valueOf(this.buffer.size()), this.tableName, Integer.valueOf(this.partition), Long.valueOf(j), uuid});
        this.buffer.clear();
        try {
            this.admin.deleteRecords(Map.of(this.changelog, RecordsToDelete.beforeOffset(j))).all().get();
            LOG.info("Truncated changelog topic {} before offset {}", this.changelog, Long.valueOf(j));
            return true;
        } catch (InterruptedException e) {
            throw new ProcessorStateException("Interrupted while truncating " + this.changelog, e);
        } catch (ExecutionException e2) {
            LOG.warn("Could not truncate changelog topic-partition {}.", this.changelog, e2);
            return true;
        }
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        long j = this.client.getOffset(this.tableName, this.partition).offset;
        long j2 = -1;
        for (ConsumerRecord<byte[], byte[]> consumerRecord : collection) {
            if (consumerRecord.offset() > j) {
                j2 = consumerRecord.offset();
                if (consumerRecord.value() == null) {
                    tombstone(this.plugin.keyFromRecord(consumerRecord));
                } else {
                    put(this.plugin.keyFromRecord(consumerRecord), (byte[]) consumerRecord.value());
                }
            }
        }
        if (j2 < 0 || flush(j2, null)) {
            return;
        }
        this.buffer.clear();
        CassandraClient.OffsetRow offset = this.client.getOffset(this.tableName, this.partition);
        LOG.warn("Restoration for {}[{}] was fenced. There is likely an existing active replica that is writing to Cassandra. Original Offset: {}, Batch Offset: {}, Latest Offset: {}, Stored txnid: {}. This is not a problem but may cause rebalancing to take longer.", new Object[]{this.tableName, Integer.valueOf(this.partition), Long.valueOf(j), Long.valueOf(j2), Long.valueOf(offset.offset), offset.txind});
    }
}
