package dev.responsive.kafka.internal.stores;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveWindowParams;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.db.BatchFlusher;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.RemoteTableSpecFactory;
import dev.responsive.kafka.internal.db.RemoteWindowedTable;
import dev.responsive.kafka.internal.db.WindowFlushManager;
import dev.responsive.kafka.internal.db.WindowedKeySpec;
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner;
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
import dev.responsive.kafka.internal.utils.Iterators;
import dev.responsive.kafka.internal.utils.Result;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.StoreUtil;
import dev.responsive.kafka.internal.utils.TableName;
import dev.responsive.kafka.internal.utils.WindowedKey;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/stores/SegmentedOperations.class */
public class SegmentedOperations implements WindowOperations {
    private final InternalProcessorContext context;
    private final ResponsiveWindowParams params;
    private final RemoteWindowedTable<?> table;
    private final CommitBuffer<WindowedKey, ?> buffer;
    private final TopicPartition changelog;
    private final ResponsiveStoreRegistry storeRegistry;
    private final ResponsiveStoreRegistration registration;
    private final ResponsiveRestoreListener restoreListener;
    private final long initialStreamTime;

    public static SegmentedOperations create(TableName tableName, StateStoreContext stateStoreContext, ResponsiveWindowParams responsiveWindowParams, Map<String, Object> map, ResponsiveConfig responsiveConfig, Predicate<WindowedKey> predicate) throws InterruptedException, TimeoutException {
        RemoteWindowedTable<?> createMongo;
        Logger logger = new LogContext(String.format("window-store [%s] ", tableName.kafkaName())).logger(SegmentedOperations.class);
        InternalProcessorContext asInternalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
        SessionClients loadSessionClients = InternalSessionConfigs.loadSessionClients(map);
        ResponsiveStoreRegistry loadStoreRegistry = InternalSessionConfigs.loadStoreRegistry(map);
        TopicPartition topicPartition = new TopicPartition(ProcessorContextUtils.changelogFor(stateStoreContext, tableName.kafkaName(), false), asInternalProcessorContext.taskId().partition());
        WindowSegmentPartitioner windowSegmentPartitioner = new WindowSegmentPartitioner(responsiveWindowParams.retentionPeriod(), StoreUtil.computeSegmentInterval(responsiveWindowParams.retentionPeriod(), responsiveWindowParams.numSegments()));
        switch (loadSessionClients.storageBackend()) {
            case CASSANDRA:
                createMongo = createCassandra(responsiveWindowParams, loadSessionClients, windowSegmentPartitioner);
                break;
            case MONGO_DB:
                createMongo = createMongo(responsiveWindowParams, loadSessionClients, windowSegmentPartitioner);
                break;
            default:
                throw new IllegalStateException("Unexpected value: " + loadSessionClients.storageBackend());
        }
        WindowFlushManager init = createMongo.init(topicPartition.partition());
        logger.info("Remote table {} is available for querying.", tableName.tableName());
        WindowedKeySpec windowedKeySpec = new WindowedKeySpec(predicate);
        CommitBuffer from = CommitBuffer.from(new BatchFlusher(windowedKeySpec, topicPartition.partition(), init), loadSessionClients, topicPartition, windowedKeySpec, responsiveWindowParams.name(), responsiveConfig);
        long fetchOffset = createMongo.fetchOffset(topicPartition.partition());
        String kafkaName = tableName.kafkaName();
        OptionalLong empty = fetchOffset == -1 ? OptionalLong.empty() : OptionalLong.of(fetchOffset);
        Objects.requireNonNull(from);
        ResponsiveStoreRegistration responsiveStoreRegistration = new ResponsiveStoreRegistration(kafkaName, topicPartition, empty, (v1) -> {
            r5.flush(v1);
        }, StoreUtil.streamThreadId());
        loadStoreRegistry.registerStore(responsiveStoreRegistration);
        return new SegmentedOperations(asInternalProcessorContext, responsiveWindowParams, createMongo, from, topicPartition, loadStoreRegistry, responsiveStoreRegistration, loadSessionClients.restoreListener(), init.streamTime());
    }

    private static RemoteWindowedTable<?> createCassandra(ResponsiveWindowParams responsiveWindowParams, SessionClients sessionClients, WindowSegmentPartitioner windowSegmentPartitioner) throws InterruptedException, TimeoutException {
        CassandraClient cassandraClient = sessionClients.cassandraClient();
        RemoteTableSpec fromWindowParams = RemoteTableSpecFactory.fromWindowParams(responsiveWindowParams, windowSegmentPartitioner);
        switch (responsiveWindowParams.schemaType()) {
            case WINDOW:
                return cassandraClient.windowedFactory().create(fromWindowParams, windowSegmentPartitioner);
            case STREAM:
                throw new UnsupportedOperationException("Not yet implemented");
            default:
                throw new IllegalArgumentException(responsiveWindowParams.schemaType().name());
        }
    }

    private static RemoteWindowedTable<?> createMongo(ResponsiveWindowParams responsiveWindowParams, SessionClients sessionClients, WindowSegmentPartitioner windowSegmentPartitioner) throws InterruptedException, TimeoutException {
        ResponsiveMongoClient mongoClient = sessionClients.mongoClient();
        switch (responsiveWindowParams.schemaType()) {
            case WINDOW:
                return mongoClient.windowedTable(responsiveWindowParams.name().tableName(), windowSegmentPartitioner);
            case STREAM:
                throw new UnsupportedOperationException("Not yet implemented");
            default:
                throw new IllegalArgumentException(responsiveWindowParams.schemaType().name());
        }
    }

    public SegmentedOperations(InternalProcessorContext internalProcessorContext, ResponsiveWindowParams responsiveWindowParams, RemoteWindowedTable remoteWindowedTable, CommitBuffer<WindowedKey, ?> commitBuffer, TopicPartition topicPartition, ResponsiveStoreRegistry responsiveStoreRegistry, ResponsiveStoreRegistration responsiveStoreRegistration, ResponsiveRestoreListener responsiveRestoreListener, long j) {
        this.context = internalProcessorContext;
        this.params = responsiveWindowParams;
        this.table = remoteWindowedTable;
        this.buffer = commitBuffer;
        this.changelog = topicPartition;
        this.storeRegistry = responsiveStoreRegistry;
        this.registration = responsiveStoreRegistration;
        this.restoreListener = responsiveRestoreListener;
        this.initialStreamTime = j;
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public long initialStreamTime() {
        return this.initialStreamTime;
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public void put(Bytes bytes, byte[] bArr, long j) {
        this.buffer.put(new WindowedKey(bytes, j), bArr, this.context.timestamp());
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public void delete(Bytes bytes, long j) {
        this.buffer.tombstone(new WindowedKey(bytes, j), this.context.timestamp());
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public byte[] fetch(Bytes bytes, long j) {
        Result<WindowedKey> result = this.buffer.get(new WindowedKey(bytes, j));
        if (result == null) {
            return this.table.fetch(this.changelog.partition(), bytes, j);
        }
        if (result.isTombstone) {
            return null;
        }
        return result.value;
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public WindowStoreIterator<byte[]> fetch(Bytes bytes, long j, long j2) {
        return Iterators.windowed(new LocalRemoteKvIterator(this.buffer.range(new WindowedKey(bytes, j), new WindowedKey(bytes, j2)), this.table.fetch(this.changelog.partition(), bytes, j, j2)));
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        throw new UnsupportedOperationException("Not yet implemented.");
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long j, long j2) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public WindowStoreIterator<byte[]> backwardFetch(Bytes bytes, long j, long j2) {
        return Iterators.windowed(new LocalRemoteKvIterator(this.buffer.backRange(new WindowedKey(bytes, j), new WindowedKey(bytes, j2)), this.table.backFetch(this.changelog.partition(), bytes, j, j2)));
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(long j, long j2) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override // dev.responsive.kafka.internal.stores.WindowOperations, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.buffer.close();
        this.restoreListener.onStoreClosed(this.changelog, this.params.name().kafkaName());
        this.storeRegistry.deregisterStore(this.registration);
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        this.buffer.restoreBatch(collection);
    }
}
