package com.hazelcast.jet.kafka.impl;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.WatermarkSourceUtil;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.util.Preconditions;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;

/* loaded from: input_file:com/hazelcast/jet/kafka/impl/StreamKafkaP.class */
public final class StreamKafkaP<K, V, T> extends AbstractProcessor {
    private static final long METADATA_CHECK_INTERVAL_NANOS;
    private static final int POLL_TIMEOUT_MS = 50;
    private final Properties properties;
    private final List<String> topics;
    private final DistributedFunction<? super ConsumerRecord<K, V>, ? extends T> projectionFn;
    private final WatermarkSourceUtil<? super T> watermarkSourceUtil;
    private int totalParallelism;
    private boolean snapshottingEnabled;
    private KafkaConsumer<K, V> consumer;
    private final int[] partitionCounts;
    private Traverser<Map.Entry<BroadcastKey<TopicPartition>, long[]>> snapshotTraverser;
    private int processorIndex;
    static final /* synthetic */ boolean $assertionsDisabled;
    Map<TopicPartition, Integer> currentAssignment = new HashMap();
    private long nextMetadataCheck = Long.MIN_VALUE;
    private final Map<String, long[]> offsets = new HashMap();
    private Traverser<Object> traverser = Traversers.empty();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/jet/kafka/impl/StreamKafkaP$KafkaPartitionAssigner.class */
    public static class KafkaPartitionAssigner {
        private final List<String> topics;
        private final int[] partitionCounts;
        private final int totalParallelism;

        KafkaPartitionAssigner(List<String> list, int[] iArr, int i) {
            Preconditions.checkTrue(list.size() == iArr.length, "Different length between topics and partition counts");
            this.topics = list;
            this.partitionCounts = iArr;
            this.totalParallelism = i;
        }

        Set<TopicPartition> topicPartitionsFor(int i) {
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            for (int i2 = 0; i2 < this.topics.size(); i2++) {
                for (int i3 = 0; i3 < this.partitionCounts[i2]; i3++) {
                    if (processorIndexFor(i2, i3) == i) {
                        linkedHashSet.add(new TopicPartition(this.topics.get(i2), i3));
                    }
                }
            }
            return linkedHashSet;
        }

        private int processorIndexFor(int i, int i2) {
            return ((i * Math.max(1, this.totalParallelism / this.topics.size())) + i2) % this.totalParallelism;
        }
    }

    StreamKafkaP(@Nonnull Properties properties, @Nonnull List<String> list, @Nonnull DistributedFunction<? super ConsumerRecord<K, V>, ? extends T> distributedFunction, @Nonnull WatermarkGenerationParams<? super T> watermarkGenerationParams) {
        this.properties = properties;
        this.topics = list;
        this.projectionFn = distributedFunction;
        this.watermarkSourceUtil = new WatermarkSourceUtil<>(watermarkGenerationParams);
        this.partitionCounts = new int[list.size()];
    }

    public boolean isCooperative() {
        return false;
    }

    protected void init(@Nonnull Processor.Context context) {
        this.processorIndex = context.globalProcessorIndex();
        this.totalParallelism = context.totalParallelism();
        this.snapshottingEnabled = context.snapshottingEnabled();
        this.consumer = new KafkaConsumer<>(this.properties);
        assignPartitions(false);
    }

    private void assignPartitions(boolean z) {
        if (System.nanoTime() < this.nextMetadataCheck) {
            return;
        }
        boolean z2 = true;
        for (int i = 0; i < this.topics.size(); i++) {
            int size = this.consumer.partitionsFor(this.topics.get(i)).size();
            z2 &= this.partitionCounts[i] == size;
            this.partitionCounts[i] = size;
        }
        if (z2) {
            return;
        }
        Set<TopicPartition> set = new KafkaPartitionAssigner(this.topics, this.partitionCounts, this.totalParallelism).topicPartitionsFor(this.processorIndex);
        LoggingUtil.logFinest(getLogger(), "Currently assigned partitions: %s", set);
        set.removeAll(this.currentAssignment.keySet());
        if (!set.isEmpty()) {
            getLogger().info("Partition assignments changed, added partitions: " + set);
            Iterator<TopicPartition> it = set.iterator();
            while (it.hasNext()) {
                this.currentAssignment.put(it.next(), Integer.valueOf(this.currentAssignment.size()));
            }
            this.watermarkSourceUtil.increasePartitionCount(this.currentAssignment.size());
            this.consumer.assign(this.currentAssignment.keySet());
            if (z) {
                this.consumer.seekToBeginning(set);
            }
        }
        createOrExtendOffsetsArrays();
        this.nextMetadataCheck = System.nanoTime() + METADATA_CHECK_INTERVAL_NANOS;
    }

    private void createOrExtendOffsetsArrays() {
        for (int i = 0; i < this.partitionCounts.length; i++) {
            int i2 = this.partitionCounts[i];
            String str = this.topics.get(i);
            long[] jArr = this.offsets.get(str);
            if (jArr == null || jArr.length != i2) {
                long[] jArr2 = new long[i2];
                Arrays.fill(jArr2, -1L);
                if (jArr != null) {
                    System.arraycopy(jArr, 0, jArr2, 0, jArr.length);
                }
                this.offsets.put(str, jArr2);
            }
        }
    }

    public boolean complete() {
        if (!emitFromTraverser(this.traverser)) {
            return false;
        }
        try {
            ConsumerRecords consumerRecords = null;
            assignPartitions(true);
            if (!this.currentAssignment.isEmpty()) {
                consumerRecords = this.consumer.poll(50L);
            }
            this.traverser = isEmpty(consumerRecords) ? this.watermarkSourceUtil.handleNoEvent() : Traversers.traverseIterable(consumerRecords).flatMap(consumerRecord -> {
                this.offsets.get(consumerRecord.topic())[consumerRecord.partition()] = consumerRecord.offset();
                Object apply = this.projectionFn.apply(consumerRecord);
                if (apply == null) {
                    return Traversers.empty();
                }
                return this.watermarkSourceUtil.handleEvent(apply, this.currentAssignment.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())).intValue());
            });
            emitFromTraverser(this.traverser);
            if (!this.snapshottingEnabled) {
                this.consumer.commitSync();
            }
            return false;
        } catch (InterruptException e) {
            return false;
        }
    }

    public void close() {
        if (this.consumer != null) {
            try {
                this.consumer.close();
            } catch (InterruptException e) {
            }
        }
    }

    public boolean saveToSnapshot() {
        if (!emitFromTraverser(this.traverser)) {
            return false;
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseStream(this.offsets.entrySet().stream().flatMap(entry -> {
                return IntStream.range(0, ((long[]) entry.getValue()).length).filter(i -> {
                    return ((long[]) entry.getValue())[i] >= 0;
                }).mapToObj(i2 -> {
                    TopicPartition topicPartition = new TopicPartition((String) entry.getKey(), i2);
                    return Util.entry(BroadcastKey.broadcastKey(topicPartition), new long[]{((long[]) entry.getValue())[i2], this.watermarkSourceUtil.getWatermark(this.currentAssignment.get(topicPartition).intValue())});
                });
            })).onFirstNull(() -> {
                this.snapshotTraverser = null;
                if (getLogger().isFineEnabled()) {
                    getLogger().fine("Finished saving snapshot. Saved offsets: " + offsets() + ", Saved watermarks: " + watermarks());
                }
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    public void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        TopicPartition topicPartition = (TopicPartition) ((BroadcastKey) obj).key();
        long[] jArr = (long[]) obj2;
        long j = jArr[0];
        long j2 = jArr[1];
        long[] jArr2 = this.offsets.get(topicPartition.topic());
        if (jArr2 == null) {
            getLogger().severe("Offset for topic '" + topicPartition.topic() + "' is present in snapshot, but the topic is not supposed to be read");
            return;
        }
        if (topicPartition.partition() >= jArr2.length) {
            getLogger().severe("Offset for partition '" + topicPartition + "' is present in snapshot, but that topic currently has only " + jArr2.length + " partitions");
        }
        Integer num = this.currentAssignment.get(topicPartition);
        if (num != null) {
            if (!$assertionsDisabled && jArr2[topicPartition.partition()] >= 0) {
                throw new AssertionError("duplicate offset for topicPartition '" + topicPartition + "' restored, offset1=" + jArr2[topicPartition.partition()] + ", offset2=" + j);
            }
            jArr2[topicPartition.partition()] = j;
            this.consumer.seek(topicPartition, j + 1);
            this.watermarkSourceUtil.restoreWatermark(num.intValue(), j2);
        }
    }

    public boolean finishSnapshotRestore() {
        if (!getLogger().isFineEnabled()) {
            return true;
        }
        getLogger().fine("Finished restoring snapshot. Restored offsets: " + offsets() + " and watermarks:" + watermarks());
        return true;
    }

    private boolean isEmpty(ConsumerRecords<K, V> consumerRecords) {
        return consumerRecords == null || consumerRecords.isEmpty();
    }

    private Map<TopicPartition, Long> offsets() {
        return (Map) this.currentAssignment.keySet().stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return Long.valueOf(this.offsets.get(topicPartition2.topic())[topicPartition2.partition()]);
        }));
    }

    private Map<TopicPartition, Long> watermarks() {
        return (Map) this.currentAssignment.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(this.watermarkSourceUtil.getWatermark(((Integer) entry.getValue()).intValue()));
        }));
    }

    @Nonnull
    public static <K, V, T> DistributedSupplier<Processor> processorSupplier(@Nonnull Properties properties, @Nonnull List<String> list, @Nonnull DistributedFunction<? super ConsumerRecord<K, V>, ? extends T> distributedFunction, @Nonnull WatermarkGenerationParams<? super T> watermarkGenerationParams) {
        return () -> {
            return new StreamKafkaP(properties, list, distributedFunction, watermarkGenerationParams);
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1858212645:
                if (implMethodName.equals("lambda$processorSupplier$404f50ce$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/DistributedSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaP") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Properties;Ljava/util/List;Lcom/hazelcast/jet/function/DistributedFunction;Lcom/hazelcast/jet/core/WatermarkGenerationParams;)Lcom/hazelcast/jet/core/Processor;")) {
                    Properties properties = (Properties) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    DistributedFunction distributedFunction = (DistributedFunction) serializedLambda.getCapturedArg(2);
                    WatermarkGenerationParams watermarkGenerationParams = (WatermarkGenerationParams) serializedLambda.getCapturedArg(3);
                    return () -> {
                        return new StreamKafkaP(properties, list, distributedFunction, watermarkGenerationParams);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !StreamKafkaP.class.desiredAssertionStatus();
        METADATA_CHECK_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(5L);
    }
}
