package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.outbox;

import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.DebeziumCdcHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsDelete;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsInsert;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsUpdate;
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
import com.mongodb.client.model.WriteModel;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import org.bson.BsonInvalidOperationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/outbox/OutboxHandler.class */
public class OutboxHandler extends DebeziumCdcHandler {
    public static final String JSON_DOC_BEFORE_FIELD = "before";
    public static final String JSON_DOC_AFTER_FIELD = "after";
    public static final String JSON_DOC_PAYLOAD_FIELD = "payload";
    public static final String PAYLOAD_DEFAULT_ID_FIELD = "id";
    private static Logger logger = LoggerFactory.getLogger(OutboxHandler.class);

    public OutboxHandler(MongoDbSinkConnectorConfig mongoDbSinkConnectorConfig) {
        super(mongoDbSinkConnectorConfig);
        HashMap hashMap = new HashMap();
        hashMap.put(OperationType.CREATE, new RdbmsInsert());
        hashMap.put(OperationType.READ, new RdbmsInsert());
        hashMap.put(OperationType.UPDATE, new RdbmsUpdate());
        hashMap.put(OperationType.DELETE, new RdbmsDelete());
        registerOperations(hashMap);
    }

    public OutboxHandler(MongoDbSinkConnectorConfig mongoDbSinkConnectorConfig, Map<OperationType, CdcOperation> map) {
        super(mongoDbSinkConnectorConfig);
        registerOperations(map);
    }

    @Override // at.grahsl.kafka.connect.mongodb.cdc.CdcHandler
    public Optional<WriteModel<BsonDocument>> handle(SinkDocument sinkDocument) {
        BsonDocument orElseGet = sinkDocument.getKeyDoc().orElseGet(BsonDocument::new);
        BsonDocument orElseGet2 = sinkDocument.getValueDoc().orElseGet(BsonDocument::new);
        if (orElseGet2.isEmpty()) {
            logger.debug("skipping debezium tombstone event for kafka topic compaction");
            return Optional.empty();
        }
        if (!(getCdcOperation(orElseGet2) instanceof OutboxInsert) || extractAndParseBsonDocumentFromPayloadField(orElseGet2).containsKey("id")) {
            return Optional.ofNullable(getCdcOperation(orElseGet2).perform(new SinkDocument(orElseGet, orElseGet2)));
        }
        logger.debug("skipping debezium cdc outbox event which is missing the default 'id' field and is thus most likely not an aggregate root");
        return Optional.empty();
    }

    protected static BsonDocument extractAndParseBsonDocumentFromPayloadField(BsonDocument bsonDocument) {
        try {
            BsonDocument parse = BsonDocument.parse(bsonDocument.getDocument("after").getString(JSON_DOC_PAYLOAD_FIELD).getValue());
            if (parse.isEmpty()) {
                throw new BsonInvalidOperationException("after contains an empty payload field");
            }
            return parse;
        } catch (BsonInvalidOperationException e) {
            throw new DataException("error: value doc 'payload' field is empty or has invalid type for insert/read operation which seems severely wrong -> defensive actions taken!", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static BsonDocument generateFilterDoc(BsonDocument bsonDocument, BsonDocument bsonDocument2, OperationType operationType) {
        if (operationType.equals(OperationType.CREATE) || operationType.equals(OperationType.READ)) {
            return new BsonDocument("_id", new BsonDocument("id", extractAndParseBsonDocumentFromPayloadField(bsonDocument2).get("id")));
        }
        if (operationType.equals(OperationType.UPDATE)) {
            throw new DataException("error: operation type " + OperationType.UPDATE + " unexpected and not allowed for outbox events");
        }
        if (operationType.equals(OperationType.DELETE)) {
            throw new DataException("error: operation type " + OperationType.DELETE + " must not be handled for outbox events - a no op should be configured for this");
        }
        throw new DataException("error: unknown or invalid operation type " + operationType);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static BsonDocument generateUpsertOrReplaceDoc(BsonDocument bsonDocument, BsonDocument bsonDocument2, BsonDocument bsonDocument3) {
        BsonDocument extractAndParseBsonDocumentFromPayloadField = extractAndParseBsonDocumentFromPayloadField(bsonDocument2);
        BsonDocument bsonDocument4 = new BsonDocument();
        if (bsonDocument3.containsKey("_id")) {
            bsonDocument4.put("_id", bsonDocument3.get("_id"));
        }
        extractAndParseBsonDocumentFromPayloadField.keySet().forEach(str -> {
            bsonDocument4.put(str, extractAndParseBsonDocumentFromPayloadField.get(str));
        });
        return bsonDocument4;
    }
}
