package com.hazelcast.jet.impl.connector.kafka;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.CloseableProcessorSupplier;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.nio.Address;
import com.hazelcast.util.Preconditions;
import java.io.Closeable;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;

/* loaded from: input_file:com/hazelcast/jet/impl/connector/kafka/StreamKafkaP.class */
public final class StreamKafkaP<K, V, T> extends AbstractProcessor implements Closeable {
    private static final long KAFKA_DEFAULT_REFRESH_INTERVAL = 300000;
    private static final int POLL_TIMEOUT_MS = 50;
    private final Properties properties;
    private final List<String> topics;
    private final DistributedBiFunction<K, V, T> projectionFn;
    private final int globalParallelism;
    private boolean snapshottingEnabled;
    private KafkaConsumer<Object, Object> consumer;
    private Traverser<Map.Entry<BroadcastKey<TopicPartition>, Long>> snapshotTraverser;
    private long metadataRefreshInterval;
    private int processorIndex;
    private Traverser<T> traverser;
    private ConsumerRecord<Object, Object> lastEmittedItem;
    static final /* synthetic */ boolean $assertionsDisabled;
    private long nextPartitionCheck = Long.MIN_VALUE;
    private final Map<String, long[]> offsets = new HashMap();
    private Set<TopicPartition> currentAssignment = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/jet/impl/connector/kafka/StreamKafkaP$KafkaPartitionAssigner.class */
    public static class KafkaPartitionAssigner {
        private final List<String> topics;
        private final List<Integer> partitionCounts;
        private final int globalParallelism;

        KafkaPartitionAssigner(List<String> list, List<Integer> list2, int i) {
            Preconditions.checkTrue(list.size() == list2.size(), "Different length between topics and partition counts");
            this.topics = list;
            this.partitionCounts = list2;
            this.globalParallelism = i;
        }

        Set<TopicPartition> topicPartitionsFor(int i) {
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            for (int i2 = 0; i2 < this.topics.size(); i2++) {
                for (int i3 = 0; i3 < this.partitionCounts.get(i2).intValue(); i3++) {
                    if (processorIndexFor(i2, i3) == i) {
                        linkedHashSet.add(new TopicPartition(this.topics.get(i2), i3));
                    }
                }
            }
            return linkedHashSet;
        }

        private int processorIndexFor(int i, int i2) {
            return ((i * Math.max(1, this.globalParallelism / this.topics.size())) + i2) % this.globalParallelism;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/kafka/StreamKafkaP$MetaSupplier.class */
    public static class MetaSupplier<K, V, T> implements ProcessorMetaSupplier {
        private final Properties properties = new Properties();
        private final List<String> topics;
        private final DistributedBiFunction<K, V, T> projectionFn;
        private final long metadataRefreshInterval;
        private int totalParallelism;

        public MetaSupplier(Properties properties, List<String> list, DistributedBiFunction<K, V, T> distributedBiFunction) {
            this.properties.putAll(properties);
            this.topics = list;
            this.projectionFn = distributedBiFunction;
            if (properties.containsKey("metadata.max.age.ms")) {
                this.metadataRefreshInterval = Long.parseLong(properties.getProperty("metadata.max.age.ms"));
            } else {
                this.metadataRefreshInterval = StreamKafkaP.KAFKA_DEFAULT_REFRESH_INTERVAL;
            }
            this.properties.setProperty("metadata.max.age.ms", "1000");
        }

        public int preferredLocalParallelism() {
            return 2;
        }

        public void init(@Nonnull ProcessorMetaSupplier.Context context) {
            this.totalParallelism = context.totalParallelism();
        }

        @Nonnull
        public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> list) {
            return address -> {
                return new CloseableProcessorSupplier(() -> {
                    return new StreamKafkaP(this.properties, this.topics, this.projectionFn, this.totalParallelism, this.metadataRefreshInterval);
                });
            };
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1224151496:
                    if (implMethodName.equals("lambda$null$100416be$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/DistributedSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/kafka/StreamKafkaP$MetaSupplier") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/impl/connector/kafka/StreamKafkaP;")) {
                        MetaSupplier metaSupplier = (MetaSupplier) serializedLambda.getCapturedArg(0);
                        return () -> {
                            return new StreamKafkaP(this.properties, this.topics, this.projectionFn, this.totalParallelism, this.metadataRefreshInterval);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    StreamKafkaP(@Nonnull Properties properties, @Nonnull List<String> list, @Nonnull DistributedBiFunction<K, V, T> distributedBiFunction, int i, long j) {
        this.properties = properties;
        this.topics = list;
        this.projectionFn = distributedBiFunction;
        this.globalParallelism = i;
        this.metadataRefreshInterval = j;
    }

    protected void init(@Nonnull Processor.Context context) throws Exception {
        this.processorIndex = context.globalProcessorIndex();
        this.snapshottingEnabled = context.snapshottingEnabled();
        this.consumer = new KafkaConsumer<>(this.properties);
        assignPartitions(false);
    }

    private void assignPartitions(boolean z) {
        List<Integer> list = (List) this.topics.stream().map(str -> {
            return Integer.valueOf(this.consumer.partitionsFor(str).size());
        }).collect(Collectors.toList());
        validateEnoughPartitions(this.topics, list, this.globalParallelism);
        Set<TopicPartition> set = new KafkaPartitionAssigner(this.topics, list, this.globalParallelism).topicPartitionsFor(this.processorIndex);
        LoggingUtil.logFinest(getLogger(), "Currently assigned partitions: %s", set);
        set.removeAll(this.currentAssignment);
        if (!set.isEmpty()) {
            getLogger().info("Partition assignments changed, new partitions: " + set);
            this.currentAssignment.addAll(set);
            this.consumer.assign(this.currentAssignment);
            if (z) {
                this.consumer.seekToBeginning(set);
            }
        }
        createOrExtendOffsetsArrays(list);
        this.nextPartitionCheck = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(this.metadataRefreshInterval);
    }

    private void createOrExtendOffsetsArrays(List<Integer> list) {
        for (int i = 0; i < list.size(); i++) {
            int intValue = list.get(i).intValue();
            String str = this.topics.get(i);
            long[] jArr = this.offsets.get(str);
            if (jArr == null || jArr.length != intValue) {
                long[] jArr2 = new long[intValue];
                Arrays.fill(jArr2, -1L);
                if (jArr != null) {
                    System.arraycopy(jArr, 0, jArr2, 0, jArr.length);
                }
                this.offsets.put(str, jArr2);
            }
        }
    }

    public boolean complete() {
        if (System.nanoTime() >= this.nextPartitionCheck) {
            assignPartitions(true);
        }
        if (!$assertionsDisabled && this.currentAssignment.isEmpty()) {
            throw new AssertionError("No topic partitions assigned to this processor.");
        }
        if (this.traverser == null) {
            try {
                ConsumerRecords poll = this.consumer.poll(50L);
                if (poll.isEmpty()) {
                    return false;
                }
                this.traverser = Traversers.traverseIterable(poll).peek(consumerRecord -> {
                    this.lastEmittedItem = consumerRecord;
                }).map(consumerRecord2 -> {
                    return this.projectionFn.apply(consumerRecord2.key(), consumerRecord2.value());
                }).onFirstNull(() -> {
                    this.traverser = null;
                });
            } catch (InterruptException e) {
                return true;
            }
        }
        emitFromTraverser(this.traverser, obj -> {
            this.offsets.get(this.lastEmittedItem.topic())[this.lastEmittedItem.partition()] = this.lastEmittedItem.offset();
        });
        if (this.snapshottingEnabled) {
            return false;
        }
        this.consumer.commitSync();
        return false;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    public boolean isCooperative() {
        return false;
    }

    public boolean saveToSnapshot() {
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseStream(this.offsets.entrySet().stream().flatMap(entry -> {
                return IntStream.range(0, ((long[]) entry.getValue()).length).filter(i -> {
                    return ((long[]) entry.getValue())[i] >= 0;
                }).mapToObj(i2 -> {
                    return Util.entry(BroadcastKey.broadcastKey(new TopicPartition((String) entry.getKey(), i2)), Long.valueOf(((long[]) entry.getValue())[i2]));
                });
            })).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    public void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        TopicPartition topicPartition = (TopicPartition) ((BroadcastKey) obj).key();
        long longValue = ((Long) obj2).longValue();
        long[] jArr = this.offsets.get(topicPartition.topic());
        if (jArr == null) {
            getLogger().severe("Offset for topic '" + topicPartition.topic() + "' is present in snapshot, but the topic is not supposed to be read");
            return;
        }
        if (topicPartition.partition() >= jArr.length) {
            getLogger().severe("Offset for partition '" + topicPartition + "' is present in snapshot, but that topic currently has only " + jArr.length + " partitions");
        }
        if (this.currentAssignment.contains(topicPartition)) {
            if (!$assertionsDisabled && jArr[topicPartition.partition()] >= 0) {
                throw new AssertionError("duplicate offset for topicPartition '" + topicPartition + "' restored, offset1=" + jArr[topicPartition.partition()] + ", offset2=" + longValue);
            }
            jArr[topicPartition.partition()] = longValue;
            this.consumer.seek(topicPartition, longValue + 1);
        }
    }

    private static void validateEnoughPartitions(List<String> list, List<Integer> list2, int i) {
        int sum = list2.stream().mapToInt(num -> {
            return num.intValue();
        }).sum();
        if (sum < i) {
            HashMap hashMap = new HashMap();
            for (int i2 = 0; i2 < list.size(); i2++) {
                hashMap.put(list.get(i2), list2.get(i2));
            }
            throw new JetException("Total number of Kafka topic partitions (" + sum + ") is less than the global parallelism (" + i + ") for this vertex.  The partition counts for individual Kafka topics are " + hashMap);
        }
    }

    static {
        $assertionsDisabled = !StreamKafkaP.class.desiredAssertionStatus();
    }
}
