package io.continual.services.processor.library.influxdb.sinks;

import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import io.continual.builder.Builder;
import io.continual.metrics.MetricsCatalog;
import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.library.influxdb.common.IdbConnection;
import io.continual.services.processor.library.influxdb.common.IdbConnector;
import io.continual.util.data.HumanReadableHelper;
import io.continual.util.data.exprEval.ExprDataSource;
import io.continual.util.data.json.JsonVisitor;
import java.time.Instant;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/services/processor/library/influxdb/sinks/InfluxDbSink.class */
public class InfluxDbSink extends IdbConnector implements Sink {
    private static final String kSetting_BufferSize = "postBuffer";
    private static final int kDefault_BufferSize = 64;
    private final String fMeasurementExpr;
    private final String fTimeExpr;
    private final JSONObject fDataFields;
    private final JSONObject fTags;
    private long fRecordCount;
    private final WriteApi fWriteApi;
    private static final Logger log = LoggerFactory.getLogger(InfluxDbSink.class);

    public InfluxDbSink() throws Builder.BuildFailure {
        this(new JSONObject());
    }

    public InfluxDbSink(JSONObject jSONObject) throws Builder.BuildFailure {
        this((ConfigLoadContext) null, jSONObject);
    }

    public InfluxDbSink(ConfigLoadContext configLoadContext, JSONObject jSONObject) throws Builder.BuildFailure {
        this(dbConnectionFromConfig(configLoadContext, jSONObject), configLoadContext, jSONObject);
    }

    public InfluxDbSink(IdbConnection idbConnection, ConfigLoadContext configLoadContext, JSONObject jSONObject) throws Builder.BuildFailure {
        super(idbConnection, configLoadContext, jSONObject);
        try {
            this.fMeasurementExpr = jSONObject.getString("measurement");
            this.fTimeExpr = jSONObject.optString("timestamp", "${timestamp}");
            this.fDataFields = jSONObject.getJSONObject("data");
            this.fTags = jSONObject.optJSONObject("tags");
            this.fRecordCount = 0L;
            this.fWriteApi = getDb().getDb().getWriteApi(WriteOptions.builder().batchSize(jSONObject.optInt(kSetting_BufferSize, kDefault_BufferSize)).build());
        } catch (JSONException e) {
            throw new Builder.BuildFailure(e);
        }
    }

    public synchronized void init() {
    }

    public synchronized void close() {
        flush();
        this.fWriteApi.close();
        log.warn("InfluxDbSink closing; sent " + this.fRecordCount + " records.");
    }

    public synchronized void flush() {
    }

    public synchronized long getRecordsSent() {
        return this.fRecordCount;
    }

    public synchronized void process(MessageProcessingContext messageProcessingContext) {
        try {
            MetricsCatalog.PathPopper push = messageProcessingContext.getStreamProcessingContext().getMetrics().push("InfluxDbSink");
            Throwable th = null;
            try {
                try {
                    this.fWriteApi.writePoint(buildPoint(messageProcessingContext));
                    this.fRecordCount++;
                    if (0 == this.fRecordCount % 1000) {
                        log.info("{}K msgs written", HumanReadableHelper.numberValue(this.fRecordCount / 1000));
                    }
                    if (push != null) {
                        if (0 != 0) {
                            try {
                                push.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            push.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            log.warn("While executing a transaction: " + e.getMessage(), e);
            messageProcessingContext.getStreamProcessingContext().fail(e.getMessage());
        }
    }

    private Point buildPoint(final MessageProcessingContext messageProcessingContext) throws JSONException, NumberFormatException {
        final Point time = Point.measurement(messageProcessingContext.evalExpression(this.fMeasurementExpr)).time(Instant.ofEpochMilli(((Long) messageProcessingContext.evalExpression(this.fTimeExpr, Long.class, new ExprDataSource[0])).longValue()), WritePrecision.MS);
        JsonVisitor.forEachElement(this.fDataFields, new JsonVisitor.ObjectVisitor<JSONObject, NumberFormatException>() { // from class: io.continual.services.processor.library.influxdb.sinks.InfluxDbSink.1
            public boolean visit(String str, JSONObject jSONObject) throws JSONException, NumberFormatException {
                String evalExpression = messageProcessingContext.evalExpression(str);
                String string = jSONObject.getString("expr");
                String optString = jSONObject.optString("type", null);
                String evalExpression2 = messageProcessingContext.evalExpression(string);
                if (optString == null) {
                    time.addField(evalExpression, evalExpression2);
                    return true;
                }
                if (optString.equalsIgnoreCase("long")) {
                    time.addField(evalExpression, Long.parseLong(evalExpression2));
                    return true;
                }
                if (optString.equalsIgnoreCase("double")) {
                    time.addField(evalExpression, Double.parseDouble(evalExpression2));
                    return true;
                }
                if (!optString.equalsIgnoreCase("string")) {
                    return true;
                }
                time.addField(evalExpression, evalExpression2);
                return true;
            }
        });
        JsonVisitor.forEachElement(this.fTags, new JsonVisitor.ObjectVisitor<String, JSONException>() { // from class: io.continual.services.processor.library.influxdb.sinks.InfluxDbSink.2
            public boolean visit(String str, String str2) throws JSONException {
                time.addTag(str, (String) messageProcessingContext.evalExpression(str2, String.class, new ExprDataSource[0]));
                return true;
            }
        });
        return time;
    }
}
