package dev.responsive.kafka.internal.db.mongo;

import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import com.mongodb.client.model.WriteModel;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.WriterFactory;
import java.time.Instant;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.bson.Document;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/mongo/MongoKVTable.class */
public class MongoKVTable implements RemoteKVTable<WriteModel<Document>> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoKVTable.class);
    private static final String METADATA_COLLECTION_SUFFIX = "_md";
    private final String name;
    private final MongoCollection<KVDoc> docs;
    private final MongoCollection<MetadataDoc> metadata;
    private final MongoCollection<Document> genericDocs;
    private final MongoCollection<Document> genericMetadata;
    private final ConcurrentMap<Integer, MetadataDoc> metadataRows = new ConcurrentHashMap();

    public MongoKVTable(MongoClient mongoClient, String str) {
        this.name = str;
        MongoDatabase withCodecRegistry = mongoClient.getDatabase(str).withCodecRegistry(CodecRegistries.fromRegistries(new CodecRegistry[]{MongoClientSettings.getDefaultCodecRegistry(), CodecRegistries.fromProviders(new CodecProvider[]{PojoCodecProvider.builder().automatic(true).build()})}));
        this.genericDocs = withCodecRegistry.getCollection(str);
        this.docs = withCodecRegistry.getCollection(str, KVDoc.class);
        this.genericMetadata = withCodecRegistry.getCollection(str + "_md");
        this.metadata = withCodecRegistry.getCollection(str + "_md", MetadataDoc.class);
        this.docs.createIndex(Indexes.descending(new String[]{KVDoc.TOMBSTONE_TS}), new IndexOptions().expireAfter(12L, TimeUnit.HOURS));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public String name() {
        return this.name;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WriterFactory<Bytes, ?> init(int i) {
        this.metadataRows.computeIfAbsent(Integer.valueOf(i), num -> {
            MetadataDoc metadataDoc = (MetadataDoc) this.metadata.findOneAndUpdate(Filters.eq("partition", Integer.valueOf(i)), Updates.combine(new Bson[]{Updates.setOnInsert("partition", Integer.valueOf(i)), Updates.setOnInsert(MetadataDoc.OFFSET, -1L), Updates.inc("epoch", 1)}), new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER));
            if (metadataDoc == null) {
                throw new IllegalStateException("Uninitialized metadata for partition " + i);
            }
            LOG.info("Retrieved initial metadata {}", metadataDoc);
            return metadataDoc;
        });
        return new MongoWriterFactory(this, this.genericDocs, this.genericMetadata, i);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public byte[] get(int i, Bytes bytes, long j) {
        KVDoc kVDoc = (KVDoc) this.docs.find(Filters.eq("_id", bytes.get())).first();
        if (kVDoc == null) {
            return null;
        }
        return kVDoc.getValue();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> range(int i, Bytes bytes, Bytes bytes2, long j) {
        throw new UnsupportedOperationException();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public KeyValueIterator<Bytes, byte[]> all(int i, long j) {
        throw new UnsupportedOperationException();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WriteModel<Document> insert(int i, Bytes bytes, byte[] bArr, long j) {
        long j2 = this.metadataRows.get(Integer.valueOf(i)).epoch;
        return new UpdateOneModel(Filters.and(new Bson[]{Filters.eq("_id", bytes.get()), Filters.lte("epoch", Long.valueOf(j2))}), Updates.combine(new Bson[]{Updates.set(KVDoc.VALUE, bArr), Updates.set("epoch", Long.valueOf(j2)), Updates.unset(KVDoc.TOMBSTONE_TS)}), new UpdateOptions().upsert(true));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WriteModel<Document> delete(int i, Bytes bytes) {
        long j = this.metadataRows.get(Integer.valueOf(i)).epoch;
        return new UpdateOneModel(Filters.and(new Bson[]{Filters.eq("_id", bytes.get()), Filters.lte("epoch", Long.valueOf(j))}), Updates.combine(new Bson[]{Updates.unset(KVDoc.VALUE), Updates.set(KVDoc.TOMBSTONE_TS, Date.from(Instant.now())), Updates.set("epoch", Long.valueOf(j))}), new UpdateOptions().upsert(true));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public long fetchOffset(int i) {
        MetadataDoc metadataDoc = (MetadataDoc) this.metadata.find(Filters.eq("_id", this.metadataRows.get(Integer.valueOf(i)).id())).first();
        if (metadataDoc == null) {
            throw new IllegalStateException("Expected to find metadata row");
        }
        return metadataDoc.offset;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WriteModel<Document> setOffset(int i, long j) {
        long j2 = this.metadataRows.get(Integer.valueOf(i)).epoch;
        return new UpdateOneModel(Filters.and(new Bson[]{Filters.eq("_id", this.metadataRows.get(Integer.valueOf(i)).id()), Filters.lte("epoch", Long.valueOf(j2))}), Updates.combine(new Bson[]{Updates.set(MetadataDoc.OFFSET, Long.valueOf(j)), Updates.set("epoch", Long.valueOf(j2))}));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteKVTable
    public long approximateNumEntries(int i) {
        return 0L;
    }
}
