package at.grahsl.kafka.connect.mongodb.writemodel.strategy;

import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonString;

/* loaded from: input_file:at/grahsl/kafka/connect/mongodb/writemodel/strategy/MonotonicWritesDefaultStrategy.class */
public class MonotonicWritesDefaultStrategy implements WriteModelStrategy {
    public static final String FIELD_KAFKA_COORDS = "_kafkaCoords";
    public static final String FIELD_TOPIC = "_topic";
    public static final String FIELD_PARTITION = "_partition";
    public static final String FIELD_OFFSET = "_offset";
    private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);

    @Override // at.grahsl.kafka.connect.mongodb.writemodel.strategy.WriteModelStrategy
    public WriteModel<BsonDocument> createWriteModel(SinkDocument sinkDocument) {
        throw new DataException("error: the write model strategy " + MonotonicWritesDefaultStrategy.class.getName() + " needs the SinkRecord's data and thus cannot work on the SinkDocument param alone. please use the provided method overloading for this.");
    }

    @Override // at.grahsl.kafka.connect.mongodb.writemodel.strategy.WriteModelStrategy
    public WriteModel<BsonDocument> createWriteModel(SinkDocument sinkDocument, SinkRecord sinkRecord) {
        BsonDocument orElseThrow = sinkDocument.getValueDoc().orElseThrow(() -> {
            return new DataException("error: cannot build the WriteModel since the value document was missing unexpectedly");
        });
        orElseThrow.append(FIELD_KAFKA_COORDS, new BsonDocument(FIELD_TOPIC, new BsonString(sinkRecord.topic())).append(FIELD_PARTITION, new BsonInt32(sinkRecord.kafkaPartition().intValue())).append(FIELD_OFFSET, new BsonInt64(sinkRecord.kafkaOffset())));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new BsonDocument("$replaceRoot", new BsonDocument("newRoot", new BsonDocument("$cond", new BsonDocument("if", new BsonDocument("$and", new BsonArray(Arrays.asList(new BsonDocument("$eq", new BsonArray(Arrays.asList(new BsonString("$$ROOT._kafkaCoords._topic"), new BsonString(sinkRecord.topic())))), new BsonDocument("$eq", new BsonArray(Arrays.asList(new BsonString("$$ROOT._kafkaCoords._partition"), new BsonInt32(sinkRecord.kafkaPartition().intValue())))), new BsonDocument("$gte", new BsonArray(Arrays.asList(new BsonString("$$ROOT._kafkaCoords._offset"), new BsonInt64(sinkRecord.kafkaOffset())))))))).append("then", new BsonString("$$ROOT")).append("else", orElseThrow)))));
        return new UpdateOneModel(new BsonDocument("_id", orElseThrow.get("_id")), arrayList, UPDATE_OPTIONS);
    }
}
