package com.hazelcast.jet.cdc.impl;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.retry.RetryStrategies;
import com.hazelcast.jet.retry.RetryStrategy;
import com.hazelcast.jet.retry.impl.RetryTracker;
import com.hazelcast.logging.ILogger;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.HistoryRecord;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.sql.DriverManager;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;

/* loaded from: input_file:com/hazelcast/jet/cdc/impl/CdcSourceP.class */
public abstract class CdcSourceP<T> extends AbstractProcessor {
    public static final String NAME_PROPERTY = "name";
    public static final String CONNECTOR_CLASS_PROPERTY = "connector.class";
    public static final String SEQUENCE_EXTRACTOR_CLASS_PROPERTY = "sequence.extractor.class";
    public static final String RECONNECT_BEHAVIOR_PROPERTY = "reconnect.behavior";
    public static final String COMMIT_PERIOD_MILLIS_PROPERTY = "commit.period";
    public static final String RECONNECT_RESET_STATE_PROPERTY = "reconnect.reset.state";
    public static final RetryStrategy DEFAULT_RECONNECT_BEHAVIOR = RetryStrategies.never();
    public static final long DEFAULT_COMMIT_PERIOD_MS = TimeUnit.SECONDS.toMillis(10);
    private static final BroadcastKey<String> SNAPSHOT_KEY = BroadcastKey.broadcastKey("snap");
    private static final ThreadLocal<List<byte[]>> THREAD_LOCAL_HISTORY = new ThreadLocal<>();

    @Nonnull
    private final Properties properties;

    @Nonnull
    private final EventTimeMapper<? super T> eventTimeMapper;
    private SourceConnector connector;
    private Map<String, String> taskConfig;
    private SourceTask task;
    private RetryTracker reconnectTracker;
    private boolean clearStateOnReconnect;
    private Traverser<Map.Entry<BroadcastKey<String>, State>> snapshotTraverser;
    private boolean snapshotting;
    private long lastCommitTime;
    private long commitPeriod;
    private boolean snapshotInProgress;
    private ILogger logger;
    private State state = new State();
    private Traverser<Object> traverser = Traversers.empty();

    /* loaded from: input_file:com/hazelcast/jet/cdc/impl/CdcSourceP$DatabaseHistoryImpl.class */
    public static class DatabaseHistoryImpl extends AbstractDatabaseHistory {
        private final List<byte[]> history = (List) Objects.requireNonNull(CdcSourceP.THREAD_LOCAL_HISTORY.get());

        protected void storeRecord(HistoryRecord historyRecord) throws DatabaseHistoryException {
            this.history.add(DocumentWriter.defaultWriter().writeAsBytes(historyRecord.document()));
        }

        protected void recoverRecords(Consumer<HistoryRecord> consumer) {
            try {
                Iterator<byte[]> it = this.history.iterator();
                while (it.hasNext()) {
                    consumer.accept(new HistoryRecord(DocumentReader.defaultReader().read(it.next())));
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public boolean exists() {
            return (this.history == null || this.history.isEmpty()) ? false : true;
        }

        public boolean storageExists() {
            return this.history != null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/cdc/impl/CdcSourceP$JetConnectorContext.class */
    public static class JetConnectorContext implements ConnectorContext {
        private JetConnectorContext() {
        }

        public void requestTaskReconfiguration() {
        }

        public void raiseError(Exception exc) {
            throw ExceptionUtil.rethrow(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/cdc/impl/CdcSourceP$JetSourceTaskContext.class */
    public class JetSourceTaskContext implements SourceTaskContext {
        private JetSourceTaskContext() {
        }

        public Map<String, String> configs() {
            return CdcSourceP.this.taskConfig;
        }

        public OffsetStorageReader offsetStorageReader() {
            return new SourceOffsetStorageReader();
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/cdc/impl/CdcSourceP$SourceOffsetStorageReader.class */
    private class SourceOffsetStorageReader implements OffsetStorageReader {
        private SourceOffsetStorageReader() {
        }

        public <V> Map<String, Object> offset(Map<String, V> map) {
            return offsets(Collections.singletonList(map)).get(map);
        }

        public <V> Map<Map<String, V>, Map<String, Object>> offsets(Collection<Map<String, V>> collection) {
            HashMap hashMap = new HashMap();
            for (Map<String, V> map : collection) {
                hashMap.put(map, CdcSourceP.this.state.getOffset(map));
            }
            return hashMap;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/cdc/impl/CdcSourceP$State.class */
    public static final class State {
        private final Map<Map<String, ?>, Map<String, ?>> partitionsToOffset;
        private final List<byte[]> historyRecords;

        State() {
            this(new HashMap(), new CopyOnWriteArrayList());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public State(Map<Map<String, ?>, Map<String, ?>> map, CopyOnWriteArrayList<byte[]> copyOnWriteArrayList) {
            this.partitionsToOffset = map;
            this.historyRecords = copyOnWriteArrayList;
        }

        public Map<String, ?> getOffset(Map<String, ?> map) {
            return this.partitionsToOffset.get(map);
        }

        public void setOffset(Map<String, ?> map, Map<String, ?> map2) {
            this.partitionsToOffset.put(map, map2);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Map<Map<String, ?>, Map<String, ?>> getPartitionsToOffset() {
            return this.partitionsToOffset;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public List<byte[]> getHistoryRecords() {
            return this.historyRecords;
        }
    }

    public CdcSourceP(@Nonnull Properties properties, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        this.properties = (Properties) Objects.requireNonNull(properties, "properties");
        this.eventTimeMapper = new EventTimeMapper<>((EventTimePolicy) Objects.requireNonNull(eventTimePolicy, "eventTimePolicy"));
        this.eventTimeMapper.addPartitions(1);
    }

    protected void init(@Nonnull Processor.Context context) {
        DriverManager.getDrivers();
        String name = getName(this.properties);
        this.logger = context.logger();
        RetryStrategy retryStrategy = getRetryStrategy(this.properties);
        log(this.logger, name, "retry strategy", retryStrategy);
        this.reconnectTracker = new RetryTracker(retryStrategy);
        this.snapshotting = !ProcessingGuarantee.NONE.equals(context.processingGuarantee());
        if (!this.snapshotting) {
            this.commitPeriod = getCommitPeriod(this.properties);
            log(this.logger, name, "commit period", Long.valueOf(this.commitPeriod));
            if (this.commitPeriod > 0) {
                this.lastCommitTime = System.nanoTime();
            }
        }
        this.clearStateOnReconnect = getClearStateOnReconnect(this.properties);
        log(this.logger, name, "clear state on reconnect", Boolean.valueOf(this.clearStateOnReconnect));
    }

    public boolean isCooperative() {
        return false;
    }

    public boolean complete() {
        if (!emitFromTraverser(this.traverser) || this.reconnectTracker.needsToWait() || !isConnectionUp() || this.snapshotInProgress) {
            return false;
        }
        try {
            if (!this.snapshotting && this.commitPeriod > 0) {
                long nanoTime = System.nanoTime();
                if (nanoTime - this.lastCommitTime > this.commitPeriod) {
                    this.task.commit();
                    this.lastCommitTime = nanoTime;
                }
            }
            List<SourceRecord> poll = this.task.poll();
            if (poll == null || poll.isEmpty()) {
                this.traverser = this.eventTimeMapper.flatMapIdle();
                emitFromTraverser(this.traverser);
                return false;
            }
            for (SourceRecord sourceRecord : poll) {
                this.state.setOffset(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
                this.task.commitRecord(sourceRecord);
            }
            if (!this.snapshotting && this.commitPeriod == 0) {
                this.task.commit();
            }
            this.traverser = Traversers.traverseIterable(poll).flatMap(sourceRecord2 -> {
                T map = map(sourceRecord2);
                return map == null ? Traversers.empty() : this.eventTimeMapper.flatMapEvent(map, 0, extractTimestamp(sourceRecord2));
            });
            emitFromTraverser(this.traverser);
            return false;
        } catch (InterruptedException e) {
            this.logger.warning("Interrupted while waiting for data");
            Thread.currentThread().interrupt();
            return false;
        } catch (RuntimeException e2) {
            reconnect(e2);
            return false;
        }
    }

    @Nullable
    protected abstract T map(SourceRecord sourceRecord);

    private void reconnect(RuntimeException runtimeException) {
        if (!this.reconnectTracker.shouldTryAgain()) {
            throw shutDownAndThrow(new JetException("Failed to connect to database" + getCause(runtimeException)));
        }
        this.logger.warning("Connection to database lost, will attempt to reconnect and retry operations from scratch" + getCause(runtimeException), runtimeException);
        killConnection();
        this.reconnectTracker.reset();
        if (this.clearStateOnReconnect) {
            this.state = new State();
        }
    }

    private <Th extends Throwable> Th shutDownAndThrow(Th th) {
        killConnection();
        return th;
    }

    private boolean isConnectionUp() {
        try {
            if (this.connector == null) {
                this.connector = startNewConnector();
                this.taskConfig = (Map) this.connector.taskConfigs(1).get(0);
            }
            if (this.task == null) {
                this.task = startNewTask();
            }
            this.reconnectTracker.reset();
            return true;
        } catch (RuntimeException e) {
            handleConnectException(e);
            return false;
        } catch (JetException e2) {
            throw shutDownAndThrow(e2);
        }
    }

    private SourceConnector startNewConnector() {
        SourceConnector sourceConnector = (SourceConnector) newInstance(this.properties.getProperty(CONNECTOR_CLASS_PROPERTY), "connector");
        sourceConnector.initialize(new JetConnectorContext());
        sourceConnector.start(this.properties);
        return sourceConnector;
    }

    private SourceTask startNewTask() {
        SourceTask sourceTask = (SourceTask) newInstance(this.connector.taskClass().getName(), "task");
        sourceTask.initialize(new JetSourceTaskContext());
        THREAD_LOCAL_HISTORY.set(this.state.historyRecords);
        sourceTask.start(this.taskConfig);
        THREAD_LOCAL_HISTORY.remove();
        return sourceTask;
    }

    private void handleConnectException(RuntimeException runtimeException) {
        this.reconnectTracker.attemptFailed();
        if (!this.reconnectTracker.shouldTryAgain()) {
            throw shutDownAndThrow(new JetException("Failed to connect to database" + getCause(runtimeException)));
        }
        this.logger.warning("Failed to initialize the connector task, retrying in " + this.reconnectTracker.getNextWaitTimeMs() + "ms" + getCause(runtimeException));
    }

    public boolean saveToSnapshot() {
        if (!emitFromTraverser(this.traverser)) {
            return false;
        }
        this.snapshotInProgress = true;
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.singleton(Util.entry(SNAPSHOT_KEY, this.state)).onFirstNull(() -> {
                this.snapshotTraverser = null;
                getLogger().finest("Finished saving snapshot.");
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    public boolean snapshotCommitFinish(boolean z) {
        if (z && this.task != null) {
            try {
                this.task.commit();
            } catch (InterruptedException e) {
                this.logger.warning("Interrupted while committing");
                Thread.currentThread().interrupt();
            }
        }
        this.snapshotInProgress = false;
        return true;
    }

    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (!SNAPSHOT_KEY.equals(obj)) {
            throw new RuntimeException("Unexpected key received from snapshot: " + obj);
        }
        this.state = (State) obj2;
    }

    public void close() {
        killConnection();
    }

    private void killConnection() {
        if (this.task != null) {
            this.task.stop();
            this.task = null;
        }
        if (this.connector != null) {
            this.connector.stop();
            this.connector = null;
        }
    }

    private static String getCause(Exception exc) {
        StringBuilder sb = new StringBuilder();
        if (exc.getMessage() != null) {
            sb.append(": ").append(exc.getMessage());
        }
        if (exc.getCause() != null && exc.getCause().getMessage() != null) {
            sb.append(": ").append(exc.getCause().getMessage());
        }
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <T> T newInstance(String str, String str2) throws JetException {
        try {
            return (T) Thread.currentThread().getContextClassLoader().loadClass(str).getConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (ClassNotFoundException e) {
            throw new JetException(String.format("%s class %s not found", str2, str));
        } catch (IllegalAccessException e2) {
            throw new JetException(String.format("Default constructor of %s class %s is not accessible", str2, str));
        } catch (IllegalArgumentException | NoSuchMethodException e3) {
            throw new JetException(String.format("%s class %s has no default constructor", str2, str));
        } catch (InstantiationException e4) {
            throw new JetException(String.format("%s class %s can't be instantiated", str2, str));
        } catch (InvocationTargetException e5) {
            throw new JetException(String.format("%s class %s failed on construction: %s", str2, str, e5.getMessage()));
        }
    }

    private static String getName(Properties properties) {
        return (String) properties.get(NAME_PROPERTY);
    }

    private static RetryStrategy getRetryStrategy(Properties properties) {
        RetryStrategy retryStrategy = (RetryStrategy) properties.get(RECONNECT_BEHAVIOR_PROPERTY);
        return retryStrategy == null ? DEFAULT_RECONNECT_BEHAVIOR : retryStrategy;
    }

    private static long getCommitPeriod(Properties properties) {
        String str = (String) properties.get(COMMIT_PERIOD_MILLIS_PROPERTY);
        return TimeUnit.MILLISECONDS.toNanos(str == null ? DEFAULT_COMMIT_PERIOD_MS : Long.parseLong(str));
    }

    private static boolean getClearStateOnReconnect(Properties properties) {
        return Boolean.parseBoolean((String) properties.get(RECONNECT_RESET_STATE_PROPERTY));
    }

    private static void log(ILogger iLogger, String str, String str2, Object obj) {
        if (iLogger.isInfoEnabled()) {
            iLogger.info(str + " has '" + str2 + "' set to '" + obj + '\'');
        }
    }

    private static long extractTimestamp(SourceRecord sourceRecord) {
        Long int64;
        if (sourceRecord.valueSchema().field("ts_ms") == null || (int64 = ((Struct) sourceRecord.value()).getInt64("ts_ms")) == null) {
            return Long.MIN_VALUE;
        }
        return int64.longValue();
    }
}
