package at.grahsl.kafka.connect.mongodb;

import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
import at.grahsl.kafka.connect.mongodb.cdc.CdcHandler;
import at.grahsl.kafka.connect.mongodb.converter.SinkConverter;
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
import at.grahsl.kafka.connect.mongodb.processor.PostProcessor;
import at.grahsl.kafka.connect.mongodb.writemodel.strategy.WriteModelStrategy;
import com.mongodb.BulkWriteException;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:at/grahsl/kafka/connect/mongodb/MongoDbSinkTask.class */
public class MongoDbSinkTask extends SinkTask {
    private static Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkTask.class);
    private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions().ordered(true);
    private MongoDbSinkConnectorConfig sinkConfig;
    private MongoClient mongoClient;
    private MongoDatabase database;
    private int remainingRetries;
    private int deferRetryMs;
    private Map<String, PostProcessor> processorChains;
    private Map<String, CdcHandler> cdcHandlers;
    private Map<String, WriteModelStrategy> writeModelStrategies;
    private Map<String, MongoDbSinkConnectorConfig.RateLimitSettings> rateLimitSettings;
    private Map<String, WriteModelStrategy> deleteOneModelDefaultStrategies;
    private Map<String, MongoCollection<BsonDocument>> cachedCollections = new HashMap();
    private SinkConverter sinkConverter = new SinkConverter();

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        LOGGER.info("starting MongoDB sink task");
        this.sinkConfig = new MongoDbSinkConnectorConfig(map);
        MongoClientURI buildClientURI = this.sinkConfig.buildClientURI();
        this.mongoClient = new MongoClient(buildClientURI);
        this.database = this.mongoClient.getDatabase(buildClientURI.getDatabase());
        this.remainingRetries = this.sinkConfig.getInt(MongoDbSinkConnectorConfig.MONGODB_MAX_NUM_RETRIES_CONF).intValue();
        this.deferRetryMs = this.sinkConfig.getInt(MongoDbSinkConnectorConfig.MONGODB_RETRIES_DEFER_TIMEOUT_CONF).intValue();
        this.processorChains = this.sinkConfig.buildPostProcessorChains();
        this.cdcHandlers = this.sinkConfig.getCdcHandlers();
        this.writeModelStrategies = this.sinkConfig.getWriteModelStrategies();
        this.rateLimitSettings = this.sinkConfig.getRateLimitSettings();
        this.deleteOneModelDefaultStrategies = this.sinkConfig.getDeleteOneModelDefaultStrategies();
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            LOGGER.debug("no sink records to process for current poll operation");
        } else {
            createSinkRecordBatchesPerTopic(collection).forEach((str, mongoDbSinkRecordBatches) -> {
                String substringAfter = StringUtils.substringAfter(str, ".");
                mongoDbSinkRecordBatches.getBufferedBatches().forEach(list -> {
                    processSinkRecords(this.cachedCollections.get(str), list);
                    MongoDbSinkConnectorConfig.RateLimitSettings orDefault = this.rateLimitSettings.getOrDefault(substringAfter, this.rateLimitSettings.get(MongoDbSinkConnectorConfig.TOPIC_AGNOSTIC_KEY_NAME));
                    if (orDefault.isTriggered()) {
                        LOGGER.debug("rate limit settings triggering {}ms defer timeout after processing {} further batches for collection {}", new Object[]{Integer.valueOf(orDefault.getTimeoutMs()), Integer.valueOf(orDefault.getEveryN()), substringAfter});
                        try {
                            Thread.sleep(orDefault.getTimeoutMs());
                        } catch (InterruptedException e) {
                            LOGGER.error(e.getMessage());
                        }
                    }
                });
            });
        }
    }

    private void processSinkRecords(MongoCollection<BsonDocument> mongoCollection, List<SinkRecord> list) {
        String collectionName = mongoCollection.getNamespace().getCollectionName();
        List<? extends WriteModel<BsonDocument>> buildWriteModelCDC = this.sinkConfig.isUsingCdcHandler(collectionName) ? buildWriteModelCDC(list, collectionName) : buildWriteModel(list, collectionName);
        try {
            if (!buildWriteModelCDC.isEmpty()) {
                LOGGER.debug("bulk writing {} document(s) into collection [{}]", Integer.valueOf(buildWriteModelCDC.size()), mongoCollection.getNamespace().getFullName());
                LOGGER.debug("mongodb bulk write result: " + mongoCollection.bulkWrite(buildWriteModelCDC, BULK_WRITE_OPTIONS).toString());
            }
        } catch (MongoException e) {
            if (e instanceof BulkWriteException) {
                BulkWriteException bulkWriteException = e;
                LOGGER.error("mongodb bulk write (partially) failed", bulkWriteException);
                LOGGER.error(bulkWriteException.getWriteResult().toString());
                LOGGER.error(bulkWriteException.getWriteErrors().toString());
                LOGGER.error(bulkWriteException.getWriteConcernError().toString());
            } else {
                LOGGER.error("error on mongodb operation", e);
                LOGGER.error("writing {} document(s) into collection [{}] failed -> remaining retries ({})", new Object[]{Integer.valueOf(buildWriteModelCDC.size()), mongoCollection.getNamespace().getFullName(), Integer.valueOf(this.remainingRetries)});
            }
            int i = this.remainingRetries;
            this.remainingRetries = i - 1;
            if (i <= 0) {
                throw new ConnectException("failed to write mongodb documents despite retrying -> GIVING UP! :( :( :(", e);
            }
            LOGGER.debug("deferring retry operation for {}ms", Integer.valueOf(this.deferRetryMs));
            this.context.timeout(this.deferRetryMs);
            throw new RetriableException(e.getMessage(), e);
        }
    }

    Map<String, MongoDbSinkRecordBatches> createSinkRecordBatchesPerTopic(Collection<SinkRecord> collection) {
        LOGGER.debug("number of sink records to process: {}", Integer.valueOf(collection.size()));
        HashMap hashMap = new HashMap();
        LOGGER.debug("buffering sink records into grouped topic batches");
        collection.forEach(sinkRecord -> {
            String string = this.sinkConfig.getString(MongoDbSinkConnectorConfig.MONGODB_COLLECTION_CONF, sinkRecord.topic());
            if (string.isEmpty()) {
                LOGGER.debug("no explicit collection name mapping found for topic {} and default collection name was empty ", sinkRecord.topic());
                LOGGER.debug("using topic name {} as collection name", sinkRecord.topic());
                string = sinkRecord.topic();
            }
            String str = this.database.getName() + "." + string;
            if (this.cachedCollections.get(str) == null) {
                this.cachedCollections.put(str, this.database.getCollection(string, BsonDocument.class));
            }
            MongoDbSinkRecordBatches mongoDbSinkRecordBatches = (MongoDbSinkRecordBatches) hashMap.get(str);
            if (mongoDbSinkRecordBatches == null) {
                int intValue = this.sinkConfig.getInt(MongoDbSinkConnectorConfig.MONGODB_MAX_BATCH_SIZE, string).intValue();
                LOGGER.debug("batch size for collection {} is at most {} record(s)", string, Integer.valueOf(intValue));
                mongoDbSinkRecordBatches = new MongoDbSinkRecordBatches(intValue, collection.size());
                hashMap.put(str, mongoDbSinkRecordBatches);
            }
            mongoDbSinkRecordBatches.buffer(sinkRecord);
        });
        return hashMap;
    }

    List<? extends WriteModel<BsonDocument>> buildWriteModel(Collection<SinkRecord> collection, String str) {
        ArrayList arrayList = new ArrayList(collection.size());
        LOGGER.debug("building write model for {} record(s)", Integer.valueOf(collection.size()));
        collection.forEach(sinkRecord -> {
            SinkDocument convert = this.sinkConverter.convert(sinkRecord);
            this.processorChains.getOrDefault(str, this.processorChains.get(MongoDbSinkConnectorConfig.TOPIC_AGNOSTIC_KEY_NAME)).process(convert, sinkRecord);
            if (convert.getValueDoc().isPresent()) {
                arrayList.add(this.writeModelStrategies.getOrDefault(str, this.writeModelStrategies.get(MongoDbSinkConnectorConfig.TOPIC_AGNOSTIC_KEY_NAME)).createWriteModel(convert, sinkRecord));
            } else if (convert.getKeyDoc().isPresent() && this.sinkConfig.isDeleteOnNullValues(sinkRecord.topic())) {
                arrayList.add(this.deleteOneModelDefaultStrategies.getOrDefault(str, this.deleteOneModelDefaultStrategies.get(MongoDbSinkConnectorConfig.TOPIC_AGNOSTIC_KEY_NAME)).createWriteModel(convert));
            } else {
                LOGGER.error("skipping sink record " + sinkRecord + "for which neither key doc nor value doc were present");
            }
        });
        return arrayList;
    }

    List<? extends WriteModel<BsonDocument>> buildWriteModelCDC(Collection<SinkRecord> collection, String str) {
        LOGGER.debug("building CDC write model for {} record(s) into collection {}", Integer.valueOf(collection.size()), str);
        Stream<SinkRecord> stream = collection.stream();
        SinkConverter sinkConverter = this.sinkConverter;
        sinkConverter.getClass();
        Stream<R> map = stream.map(sinkConverter::convert);
        CdcHandler orDefault = this.cdcHandlers.getOrDefault(str, this.cdcHandlers.get(MongoDbSinkConnectorConfig.TOPIC_AGNOSTIC_KEY_NAME));
        orDefault.getClass();
        return (List) map.map(orDefault::handle).flatMap(optional -> {
            return (Stream) optional.map((v0) -> {
                return Stream.of(v0);
            }).orElseGet(Stream::empty);
        }).collect(Collectors.toList());
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void stop() {
        LOGGER.info("stopping MongoDB sink task");
        this.mongoClient.close();
    }
}
