package com.hazelcast.jet.kafka.impl;

import com.hazelcast.jet.Job;
import com.hazelcast.jet.SimpleTestInClusterSupport;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.test.TestInbox;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.impl.connector.SinkStressTestUtil;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.map.IMap;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/kafka/impl/WriteKafkaPTest.class */
public class WriteKafkaPTest extends SimpleTestInClusterSupport {
    private static final int PARTITION_COUNT = 20;
    private static KafkaTestSupport kafkaTestSupport;
    private String sourceIMapName = randomMapName();
    private Properties properties;
    private String topic;
    private IMap<Integer, String> sourceIMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/kafka/impl/WriteKafkaPTest$ProcessorWithEntryAndLatch.class */
    public static final class ProcessorWithEntryAndLatch extends AbstractProcessor {
        static volatile boolean isDone;
        static volatile boolean allowSnapshot;
        private Traverser<Map.Entry<Integer, String>> t;

        private ProcessorWithEntryAndLatch() {
            this.t = Traversers.singleton(Util.entry(0, "v"));
            isDone = false;
            allowSnapshot = false;
        }

        public boolean isCooperative() {
            return false;
        }

        public boolean saveToSnapshot() {
            return allowSnapshot;
        }

        public boolean complete() {
            return emitFromTraverser(this.t) && isDone;
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        kafkaTestSupport = new KafkaTestSupport();
        kafkaTestSupport.createKafkaCluster();
        initialize(2, null);
    }

    @Before
    public void before() {
        this.properties = new Properties();
        this.properties.setProperty("bootstrap.servers", kafkaTestSupport.getBrokerConnectionString());
        this.properties.setProperty("key.serializer", IntegerSerializer.class.getName());
        this.properties.setProperty("value.serializer", StringSerializer.class.getName());
        this.topic = randomName();
        kafkaTestSupport.createTopic(this.topic, PARTITION_COUNT);
        this.sourceIMap = instance().getMap(this.sourceIMapName);
        for (int i = 0; i < PARTITION_COUNT; i++) {
            this.sourceIMap.put(Integer.valueOf(i), String.valueOf(i));
        }
    }

    @AfterClass
    public static void afterClass() {
        kafkaTestSupport.shutdownKafkaCluster();
        kafkaTestSupport = null;
    }

    @Test
    public void testWriteToTopic() {
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.map(this.sourceIMap)).writeTo(KafkaSinks.kafka(this.properties, this.topic));
        instance().getJet().newJob(create).join();
        kafkaTestSupport.assertTopicContentsEventually(this.topic, this.sourceIMap, false);
    }

    @Test
    public void testWriteToSpecificPartitions() {
        String str = this.topic;
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.map(this.sourceIMap)).writeTo(KafkaSinks.kafka(this.properties, entry -> {
            return new ProducerRecord(str, (Integer) entry.getKey(), entry.getKey(), entry.getValue());
        }));
        instance().getJet().newJob(create).join();
        kafkaTestSupport.assertTopicContentsEventually(this.topic, this.sourceIMap, true);
    }

    @Test
    public void when_recordLingerEnabled_then_sentOnCompletion() {
        this.properties.setProperty("linger.ms", "3600000");
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.batchFromProcessor("source", ProcessorMetaSupplier.of(() -> {
            return new ProcessorWithEntryAndLatch();
        }))).writeTo(KafkaSinks.kafka(this.properties, this.topic));
        Job newJob = instance().getJet().newJob(create);
        KafkaConsumer<Integer, String> createConsumer = kafkaTestSupport.createConsumer(this.topic);
        Throwable th = null;
        try {
            assertTrueAllTheTime(() -> {
                Assert.assertEquals(0L, createConsumer.poll(Duration.ofMillis(100L)).count());
            }, 2L);
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createConsumer.close();
                }
            }
            ProcessorWithEntryAndLatch.isDone = true;
            newJob.join();
            this.logger.info("Job finished");
            kafkaTestSupport.assertTopicContentsEventually(this.topic, Collections.singletonMap(0, "v"), false);
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void when_processingGuaranteeOn_then_lingeringRecordsSentOnSnapshot_exactlyOnce() {
        when_processingGuaranteeOn_then_lingeringRecordsSentOnSnapshot(true);
    }

    @Test
    public void when_processingGuaranteeOn_then_lingeringRecordsSentOnSnapshot_atLeastOnce() {
        when_processingGuaranteeOn_then_lingeringRecordsSentOnSnapshot(false);
    }

    private void when_processingGuaranteeOn_then_lingeringRecordsSentOnSnapshot(boolean z) {
        this.properties.setProperty("linger.ms", "" + TimeUnit.HOURS.toMillis(1L));
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.batchFromProcessor("source", ProcessorMetaSupplier.of(() -> {
            return new ProcessorWithEntryAndLatch();
        }))).writeTo(KafkaSinks.kafka(this.properties).topic(this.topic).extractKeyFn((v0) -> {
            return v0.getKey();
        }).extractValueFn((v0) -> {
            return v0.getValue();
        }).exactlyOnce(z).build());
        Job newJob = instance().getJet().newJob(create, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE).setSnapshotIntervalMillis(4000L));
        KafkaConsumer<Integer, String> createConsumer = kafkaTestSupport.createConsumer(this.topic);
        Throwable th = null;
        try {
            try {
                assertTrueAllTheTime(() -> {
                    Assert.assertEquals(0L, createConsumer.poll(Duration.ofMillis(100L)).count());
                }, 2L);
                if (createConsumer != null) {
                    if (0 != 0) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumer.close();
                    }
                }
                ProcessorWithEntryAndLatch.allowSnapshot = true;
                kafkaTestSupport.assertTopicContentsEventually(this.topic, Collections.singletonMap(0, "v"), false);
                ProcessorWithEntryAndLatch.isDone = true;
                newJob.join();
            } finally {
            }
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void stressTest_graceful_exOnce() {
        stressTest(true, true);
    }

    @Test
    public void stressTest_forceful_exOnce() {
        stressTest(false, true);
    }

    @Test
    public void stressTest_graceful_atLeastOnce() {
        stressTest(false, false);
    }

    @Test
    public void stressTest_forceful_atLeastOnce() {
        stressTest(false, false);
    }

    private void stressTest(boolean z, boolean z2) {
        String str = this.topic;
        Sink build = KafkaSinks.kafka(this.properties).toRecordFn(num -> {
            return new ProducerRecord(str, 0, (Object) null, num.toString());
        }).exactlyOnce(z2).build();
        KafkaConsumer<Integer, String> createConsumer = kafkaTestSupport.createConsumer(this.topic);
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                SinkStressTestUtil.test_withRestarts(instance(), this.logger, build, z, z2, () -> {
                    while (true) {
                        ConsumerRecords poll = createConsumer.poll(Duration.ofMillis(10L));
                        if (poll.isEmpty()) {
                            return arrayList;
                        }
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            arrayList.add(Integer.valueOf(Integer.parseInt((String) ((ConsumerRecord) it.next()).value())));
                        }
                    }
                });
                if (createConsumer != null) {
                    if (0 == 0) {
                        createConsumer.close();
                        return;
                    }
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void test_resumeTransaction() throws Exception {
        this.properties.put("transactional.id", "txn.resumeTransactionTest");
        KafkaProducer kafkaProducer = new KafkaProducer(this.properties);
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafkaProducer.send(new ProducerRecord(this.topic, 0, (Object) null, "0")).get();
        kafkaProducer.send(new ProducerRecord(this.topic, 0, (Object) null, "1")).get();
        long producerId = ResumeTransactionUtil.getProducerId(kafkaProducer);
        short epoch = ResumeTransactionUtil.getEpoch(kafkaProducer);
        kafkaProducer.close();
        KafkaConsumer<Integer, String> createConsumer = kafkaTestSupport.createConsumer(this.topic);
        Assert.assertEquals(0L, createConsumer.poll(Duration.ofSeconds(2L)).count());
        KafkaProducer kafkaProducer2 = new KafkaProducer(this.properties);
        ResumeTransactionUtil.resumeTransaction(kafkaProducer2, producerId, epoch, this.properties.getProperty("transactional.id"));
        kafkaProducer2.commitTransaction();
        StringBuilder sb = new StringBuilder();
        int i = 0;
        while (i < 2) {
            Iterator it = createConsumer.poll(Duration.ofSeconds(2L)).iterator();
            while (it.hasNext()) {
                sb.append((String) ((ConsumerRecord) it.next()).value()).append('\n');
                i++;
            }
            this.logger.info("Received " + i + " records so far");
        }
        Assert.assertEquals("0\n1\n", sb.toString());
        kafkaProducer2.close();
        createConsumer.close();
    }

    @Test
    public void when_transactionRolledBackHeuristically_then_sinkIgnoresIt() throws Exception {
        this.properties.setProperty("transaction.timeout.ms", String.valueOf(2000));
        Processor processor = (Processor) WriteKafkaP.supplier(this.properties, obj -> {
            return new ProducerRecord(this.topic, obj);
        }, true).get();
        TestOutbox testOutbox = new TestOutbox(new int[0], 1024);
        TestProcessorContext processingGuarantee = new TestProcessorContext().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
        processor.init(testOutbox, processingGuarantee);
        TestInbox testInbox = new TestInbox();
        testInbox.add("foo");
        processor.process(0, testInbox);
        Assert.assertEquals("inbox size", 0L, testInbox.size());
        Assert.assertTrue(processor.saveToSnapshot());
        processor.close();
        testInbox.addAll(testOutbox.snapshotQueue());
        sleepMillis(2000 + 1000);
        Processor processor2 = (Processor) WriteKafkaP.supplier(this.properties, obj2 -> {
            return new ProducerRecord(this.topic, obj2);
        }, true).get();
        processor2.init(testOutbox, processingGuarantee);
        processor2.restoreFromSnapshot(testInbox);
        processor2.finishSnapshotRestore();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = false;
                    break;
                }
                break;
            case -1034812236:
                if (implMethodName.equals("lambda$testWriteToSpecificPartitions$7b33f52a$1")) {
                    z = 5;
                    break;
                }
                break;
            case -183894549:
                if (implMethodName.equals("lambda$when_processingGuaranteeOn_then_lingeringRecordsSentOnSnapshot$65636dda$1")) {
                    z = 2;
                    break;
                }
                break;
            case 616345206:
                if (implMethodName.equals("lambda$stressTest$d07861cc$1")) {
                    z = 6;
                    break;
                }
                break;
            case 744055259:
                if (implMethodName.equals("lambda$stressTest$a187cfa$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1566906742:
                if (implMethodName.equals("lambda$when_recordLingerEnabled_then_sentOnCompletion$fb1a34a4$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaPTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new ProcessorWithEntryAndLatch();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/KafkaConsumer;Ljava/util/List;)Ljava/util/List;")) {
                    KafkaConsumer kafkaConsumer = (KafkaConsumer) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    return () -> {
                        while (true) {
                            ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(10L));
                            if (poll.isEmpty()) {
                                return list;
                            }
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                list.add(Integer.valueOf(Integer.parseInt((String) ((ConsumerRecord) it.next()).value())));
                            }
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaPTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new ProcessorWithEntryAndLatch();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/util/Map$Entry;)Lorg/apache/kafka/clients/producer/ProducerRecord;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return entry -> {
                        return new ProducerRecord(str, (Integer) entry.getKey(), entry.getKey(), entry.getValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Integer;)Lorg/apache/kafka/clients/producer/ProducerRecord;")) {
                    String str2 = (String) serializedLambda.getCapturedArg(0);
                    return num -> {
                        return new ProducerRecord(str2, 0, (Object) null, num.toString());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
