package com.hazelcast.jet.kafka.impl;

import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.jet.Util;
import com.hazelcast.test.HazelcastTestSupport;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.admin.RackAwareMode$Disabled$;
import kafka.common.TopicAndPartition;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import scala.Option;
import scala.collection.JavaConversions;

/* loaded from: input_file:com/hazelcast/jet/kafka/impl/KafkaTestSupport.class */
public class KafkaTestSupport {
    private static final String ZK_HOST = "127.0.0.1";
    private static final String BROKER_HOST = "127.0.0.1";
    private static final int SESSION_TIMEOUT = 30000;
    private static final int CONNECTION_TIMEOUT = 30000;
    private EmbeddedZookeeper zkServer;
    private String zkConnect;
    private ZkUtils zkUtils;
    private KafkaServer kafkaServer;
    private KafkaProducer<Integer, String> producer;
    private KafkaProducer<String, String> stringStringProducer;
    private int brokerPort = -1;
    private String brokerConnectionString;

    public void createKafkaCluster() throws IOException {
        System.setProperty("zookeeper.preAllocSize", Integer.toString(128));
        this.zkServer = new EmbeddedZookeeper();
        this.zkConnect = "127.0.0.1:" + this.zkServer.port();
        this.zkUtils = ZkUtils.apply(new ZkClient(this.zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$), false);
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", this.zkConnect);
        properties.setProperty("broker.id", "0");
        properties.setProperty("log.dirs", Files.createTempDirectory("kafka-", new FileAttribute[0]).toAbsolutePath().toString());
        properties.setProperty("listeners", "PLAINTEXT://127.0.0.1:0");
        properties.setProperty("offsets.topic.replication.factor", "1");
        properties.setProperty("offsets.topic.num.partitions", "1");
        properties.setProperty("log.cleaner.dedupe.buffer.size", Long.toString(2097152L));
        properties.setProperty("transaction.state.log.replication.factor", "1");
        properties.setProperty("transaction.state.log.num.partitions", "1");
        properties.setProperty("transaction.state.log.min.isr", "1");
        properties.setProperty("transaction.abort.timed.out.transaction.cleanup.interval.ms", "200");
        properties.setProperty("group.initial.rebalance.delay.ms", "0");
        this.kafkaServer = TestUtils.createServer(new KafkaConfig(properties), new MockTime());
        this.brokerPort = TestUtils.boundPort(this.kafkaServer, SecurityProtocol.PLAINTEXT);
        this.brokerConnectionString = "127.0.0.1:" + this.brokerPort;
    }

    public void shutdownKafkaCluster() {
        if (this.kafkaServer != null) {
            this.kafkaServer.shutdown();
            this.zkUtils.close();
            if (this.producer != null) {
                this.producer.close();
            }
            try {
                this.zkServer.shutdown();
            } catch (Exception e) {
                if (!isWindows()) {
                    throw e;
                }
            }
            this.producer = null;
            this.kafkaServer = null;
            this.zkUtils = null;
            this.zkServer = null;
        }
    }

    public String getZookeeperConnectionString() {
        return this.zkConnect;
    }

    public String getBrokerConnectionString() {
        return this.brokerConnectionString;
    }

    private static boolean isWindows() {
        return StringUtil.lowerCaseInternal(System.getProperty("os.name")).contains("windows");
    }

    public void createTopic(String str, int i) {
        AdminUtils.createTopic(this.zkUtils, str, i, 1, new Properties(), RackAwareMode$Disabled$.MODULE$);
    }

    public void setPartitionCount(String str, int i) {
        Map map = (Map) JavaConversions.mapAsJavaMap(this.zkUtils.getReplicaAssignmentForTopics(JavaConversions.asScalaSet(Collections.singleton(str)).toSeq())).entrySet().stream().map(entry -> {
            return Util.entry(Integer.valueOf(((TopicAndPartition) entry.getKey()).partition()), entry.getValue());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        AdminUtils.addPartitions(this.zkUtils, str, JavaConversions.mapAsScalaMap(map), AdminUtils.getBrokerMetadatas(this.zkUtils, (RackAwareMode) null, Option.apply(this.zkUtils.getSortedBrokerList())), i, Option.empty(), false);
    }

    public Future<RecordMetadata> produce(String str, Integer num, String str2) {
        return getProducer().send(new ProducerRecord(str, num, str2));
    }

    public Future<RecordMetadata> produce(String str, String str2, String str3) {
        return getStringStringProducer().send(new ProducerRecord(str, str2, str3));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<RecordMetadata> produce(String str, int i, Long l, Integer num, String str2) {
        return getProducer().send(new ProducerRecord(str, Integer.valueOf(i), l, num, str2));
    }

    private KafkaProducer<Integer, String> getProducer() {
        if (this.producer == null) {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "127.0.0.1:" + this.brokerPort);
            properties.setProperty("key.serializer", IntegerSerializer.class.getCanonicalName());
            properties.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
            this.producer = new KafkaProducer<>(properties);
        }
        return this.producer;
    }

    private KafkaProducer<String, String> getStringStringProducer() {
        if (this.stringStringProducer == null) {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "127.0.0.1:" + this.brokerPort);
            properties.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
            properties.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
            this.stringStringProducer = new KafkaProducer<>(properties);
        }
        return this.stringStringProducer;
    }

    public void resetProducer() {
        this.producer = null;
    }

    public KafkaConsumer<Integer, String> createConsumer(String... strArr) {
        return createConsumer(IntegerDeserializer.class, StringDeserializer.class, Collections.emptyMap(), strArr);
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<V>> cls2, Map<String, String> map, String... strArr) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.brokerConnectionString);
        properties.setProperty("group.id", HazelcastTestSupport.randomString());
        properties.setProperty("client.id", "consumer0");
        properties.setProperty("key.deserializer", cls.getCanonicalName());
        properties.setProperty("value.deserializer", cls2.getCanonicalName());
        properties.setProperty("isolation.level", "read_committed");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.putAll(map);
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList(strArr));
        return kafkaConsumer;
    }

    public void assertTopicContentsEventually(String str, Map<Integer, String> map, boolean z) {
        KafkaConsumer<Integer, String> createConsumer = createConsumer(str);
        Throwable th = null;
        try {
            try {
                long nanoTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
                int i = 0;
                while (i < map.size() && System.nanoTime() < nanoTime) {
                    Iterator it = createConsumer.poll(Duration.ofMillis(100L)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        Assert.assertEquals("key=" + consumerRecord.key(), map.get(consumerRecord.key()), consumerRecord.value());
                        if (z) {
                            Assert.assertEquals(((Integer) consumerRecord.key()).intValue(), consumerRecord.partition());
                        }
                        i++;
                    }
                }
                if (createConsumer != null) {
                    if (0 == 0) {
                        createConsumer.close();
                        return;
                    }
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th4;
        }
    }

    public <K, V> void assertTopicContentsEventually(String str, Map<K, V> map, Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<V>> cls2) {
        assertTopicContentsEventually(str, map, cls, cls2, Collections.emptyMap());
    }

    public <K, V> void assertTopicContentsEventually(String str, Map<K, V> map, Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<V>> cls2, Map<String, String> map2) {
        KafkaConsumer<K, V> createConsumer = createConsumer(cls, cls2, map2, str);
        Throwable th = null;
        try {
            long nanoTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
            HashSet hashSet = new HashSet();
            int i = 0;
            while (i < map.size() && System.nanoTime() < nanoTime) {
                Iterator it = createConsumer.poll(Duration.ofMillis(100L)).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    Assert.assertTrue("key=" + consumerRecord.key() + " already seen", hashSet.add(consumerRecord.key()));
                    V v = map.get(consumerRecord.key());
                    Assert.assertNotNull("key=" + consumerRecord.key() + " received, but not expected", v);
                    Assert.assertEquals("key=" + consumerRecord.key(), v, consumerRecord.value());
                    i++;
                }
            }
            if (createConsumer != null) {
                if (0 == 0) {
                    createConsumer.close();
                    return;
                }
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }
}
