package dev.responsive.kafka.store;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import dev.responsive.db.CassandraClient;
import dev.responsive.kafka.store.CommitBuffer;
import dev.responsive.model.Result;
import dev.responsive.utils.RemoteMonitor;
import dev.responsive.utils.TableName;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/store/ResponsiveStore.class */
public class ResponsiveStore implements KeyValueStore<Bytes, byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(ResponsiveStore.class);
    static final Plugin PLUGIN = new Plugin();
    private final CassandraClient client;
    private final TableName name;
    private final RemoteMonitor initRemote;
    private final Admin admin;
    private final Position position = Position.emptyPosition();
    private boolean open;
    private CommitBuffer<Bytes> buffer;
    private InternalProcessorContext context;
    private int partition;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/kafka/store/ResponsiveStore$Plugin.class */
    public static class Plugin implements CommitBuffer.BufferPlugin<Bytes> {
        private Plugin() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // dev.responsive.kafka.store.CommitBuffer.BufferPlugin
        public Bytes keyFromRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
            return Bytes.wrap((byte[]) consumerRecord.key());
        }

        @Override // dev.responsive.kafka.store.CommitBuffer.BufferPlugin
        public BoundStatement insertData(CassandraClient cassandraClient, String str, int i, Bytes bytes, byte[] bArr) {
            return cassandraClient.insertData(str, i, bytes, bArr);
        }

        @Override // dev.responsive.kafka.store.CommitBuffer.BufferPlugin
        public BoundStatement deleteData(CassandraClient cassandraClient, String str, int i, Bytes bytes) {
            return cassandraClient.deleteData(str, i, bytes);
        }

        @Override // java.util.Comparator
        public int compare(Bytes bytes, Bytes bytes2) {
            return bytes.compareTo(bytes2);
        }

        @Override // dev.responsive.kafka.store.CommitBuffer.BufferPlugin
        public /* bridge */ /* synthetic */ Bytes keyFromRecord(ConsumerRecord consumerRecord) {
            return keyFromRecord((ConsumerRecord<byte[], byte[]>) consumerRecord);
        }
    }

    public ResponsiveStore(CassandraClient cassandraClient, TableName tableName, RemoteMonitor remoteMonitor, Admin admin) {
        this.client = cassandraClient;
        this.name = tableName;
        this.initRemote = remoteMonitor;
        this.admin = admin;
    }

    public String name() {
        return this.name.kafkaName();
    }

    public void init(ProcessorContext processorContext, StateStore stateStore) {
        if (!(processorContext instanceof StateStoreContext)) {
            throw new UnsupportedOperationException("Use ResponsiveStore#init(StateStoreContext, StateStore) instead.");
        }
        init((StateStoreContext) processorContext, stateStore);
    }

    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        try {
            LOG.info("Initializing state store {}", this.name);
            this.context = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
            this.partition = stateStoreContext.taskId().partition();
            this.client.createDataTable(this.name.cassandraName());
            this.initRemote.await(Duration.ofSeconds(60L));
            LOG.info("Remote table {} is available for querying.", this.name.cassandraName());
            this.client.prepareStatements(this.name.cassandraName());
            this.client.initializeOffset(this.name.cassandraName(), this.partition);
            this.buffer = new CommitBuffer<>(this.client, this.name.cassandraName(), new TopicPartition(ProcessorContextUtils.changelogFor(stateStoreContext, this.name.kafkaName(), false), this.partition), asRecordCollector(stateStoreContext), this.admin, PLUGIN);
            this.open = true;
            CommitBuffer<Bytes> commitBuffer = this.buffer;
            CommitBuffer<Bytes> commitBuffer2 = this.buffer;
            Objects.requireNonNull(commitBuffer2);
            stateStoreContext.register(stateStore, commitBuffer, commitBuffer2::flush);
        } catch (InterruptedException | TimeoutException e) {
            throw new ProcessorStateException("Failed to initialize store.", e);
        }
    }

    private RecordCollector.Supplier asRecordCollector(StateStoreContext stateStoreContext) {
        return (RecordCollector.Supplier) stateStoreContext;
    }

    public boolean isOpen() {
        return this.open;
    }

    public boolean persistent() {
        return false;
    }

    public void put(Bytes bytes, byte[] bArr) {
        this.buffer.put(bytes, bArr);
        StoreQueryUtils.updatePosition(this.position, this.context);
    }

    public byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        byte[] bArr2 = get(bytes);
        if (bArr2 == null) {
            put(bytes, bArr);
        }
        return bArr2;
    }

    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
        list.forEach(keyValue -> {
            put((Bytes) keyValue.key, (byte[]) keyValue.value);
        });
        StoreQueryUtils.updatePosition(this.position, this.context);
    }

    public byte[] delete(Bytes bytes) {
        byte[] bArr = get(bytes);
        this.buffer.tombstone(bytes);
        return bArr;
    }

    public byte[] get(Bytes bytes) {
        Result<Bytes> result = this.buffer.get(bytes);
        if (result == null) {
            return this.client.get(this.name.cassandraName(), this.partition, bytes);
        }
        if (result.isTombstone) {
            return null;
        }
        return result.value;
    }

    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        return new LocalRemoteKvIterator(this.buffer.range(bytes, bytes2), this.client.range(this.name.cassandraName(), this.partition, bytes, bytes2), PLUGIN);
    }

    public KeyValueIterator<Bytes, byte[]> all() {
        return new LocalRemoteKvIterator(this.buffer.all(), this.client.all(this.name.cassandraName(), this.partition), PLUGIN);
    }

    public Position getPosition() {
        return this.position;
    }

    public long approximateNumEntries() {
        return this.client.count(this.name.cassandraName(), this.partition);
    }

    public void flush() {
        this.buffer.flush();
    }

    public void close() {
        flush();
    }
}
