package dev.responsive.kafka.internal.db.mongo;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.WriteModel;
import dev.responsive.kafka.internal.db.RemoteTable;
import dev.responsive.kafka.internal.db.RemoteWriter;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/mongo/MongoWriter.class */
public class MongoWriter<K> implements RemoteWriter<K, Integer> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoWriter.class);
    private final RemoteTable<K, WriteModel<Document>> table;
    private final int kafkaPartition;
    private final MongoCollection<Document> collection;
    private final List<WriteModel<Document>> accumulatedWrites = new ArrayList();

    public MongoWriter(RemoteTable<K, WriteModel<Document>> remoteTable, int i, MongoCollection<Document> mongoCollection) {
        this.table = remoteTable;
        this.kafkaPartition = i;
        this.collection = mongoCollection;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWriter
    public void insert(K k, byte[] bArr, long j) {
        this.accumulatedWrites.add(this.table.insert(this.kafkaPartition, k, bArr, j));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWriter
    public void delete(K k) {
        this.accumulatedWrites.add(this.table.delete(this.kafkaPartition, k));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWriter
    public CompletionStage<RemoteWriteResult<Integer>> flush() {
        try {
            this.collection.bulkWrite(this.accumulatedWrites);
            this.accumulatedWrites.clear();
            return CompletableFuture.completedFuture(RemoteWriteResult.success(Integer.valueOf(this.kafkaPartition)));
        } catch (MongoBulkWriteException e) {
            LOG.error("Failed to flush to {}[{}]. If the exception contains 'E11000 duplicate key', then it was likely this writer was fenced", new Object[]{this.table.name(), Integer.valueOf(this.kafkaPartition), e});
            return CompletableFuture.completedFuture(RemoteWriteResult.failure(Integer.valueOf(this.kafkaPartition)));
        }
    }
}
