package dev.responsive.kafka.internal.db;

import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;

/* loaded from: input_file:dev/responsive/kafka/internal/db/LwtWriterFactory.class */
public class LwtWriterFactory<K, P> extends WriterFactory<K, P> {
    private final RemoteTable<K, BoundStatement> table;
    private final TableMetadata<P> tableMetadata;
    private final CassandraClient client;
    private final TablePartitioner<K, P> partitioner;
    private final int kafkaPartition;
    private final long epoch;

    public LwtWriterFactory(RemoteTable<K, BoundStatement> remoteTable, TableMetadata<P> tableMetadata, CassandraClient cassandraClient, TablePartitioner<K, P> tablePartitioner, int i, long j) {
        super(String.format("LwtWriterFactory{epoch=%d} ", Long.valueOf(j)));
        this.table = remoteTable;
        this.tableMetadata = tableMetadata;
        this.client = cassandraClient;
        this.partitioner = tablePartitioner;
        this.kafkaPartition = i;
        this.epoch = j;
    }

    @Override // dev.responsive.kafka.internal.db.WriterFactory
    public RemoteWriter<K, P> createWriter(P p) {
        return new LwtWriter(this.client, () -> {
            return this.tableMetadata.ensureEpoch(p, this.epoch);
        }, this.table, this.kafkaPartition, p);
    }

    @Override // dev.responsive.kafka.internal.db.WriterFactory
    public String tableName() {
        return this.table.name();
    }

    @Override // dev.responsive.kafka.internal.db.WriterFactory
    protected P tablePartitionForKey(K k) {
        return this.partitioner.tablePartition(this.kafkaPartition, k);
    }

    @Override // dev.responsive.kafka.internal.db.WriterFactory
    public RemoteWriteResult<P> setOffset(long j) {
        P metadataTablePartition = this.partitioner.metadataTablePartition(this.kafkaPartition);
        BatchStatementBuilder batchStatementBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);
        batchStatementBuilder.addStatement(fencingStatement(metadataTablePartition));
        batchStatementBuilder.addStatement(this.table.setOffset(this.kafkaPartition, j));
        if (this.table instanceof CassandraWindowedTable) {
            batchStatementBuilder.addStatement(((CassandraWindowedTable) this.table).setStreamTime(this.kafkaPartition, this.epoch));
        }
        return this.client.execute((Statement<?>) batchStatementBuilder.build()).wasApplied() ? RemoteWriteResult.success(metadataTablePartition) : RemoteWriteResult.failure(metadataTablePartition);
    }

    @Override // dev.responsive.kafka.internal.db.WriterFactory
    protected long offset() {
        return this.table.fetchOffset(this.kafkaPartition);
    }

    @Override // dev.responsive.kafka.internal.db.WriterFactory
    public RemoteWriteResult<P> commitPendingFlush(WriterFactory<K, P>.PendingFlush pendingFlush, long j) {
        this.tableMetadata.preCommit(this.kafkaPartition, this.epoch);
        RemoteWriteResult<P> commitPendingFlush = super.commitPendingFlush(pendingFlush, j);
        this.tableMetadata.postCommit(this.kafkaPartition, this.epoch);
        return commitPendingFlush;
    }

    @Override // dev.responsive.kafka.internal.db.WriterFactory
    public String failedFlushError(RemoteWriteResult<P> remoteWriteResult, long j) {
        return super.failedFlushError(remoteWriteResult, j) + String.format(", Local Epoch: %s, Persisted Epoch: %d", Long.valueOf(this.epoch), Long.valueOf(this.tableMetadata.fetchEpoch(remoteWriteResult.tablePartition())));
    }

    private BoundStatement fencingStatement(P p) {
        return this.tableMetadata.ensureEpoch(p, this.epoch);
    }
}
