package moa.streams;

import com.github.javacliparser.StringOption;
import com.yahoo.labs.samoa.instances.Instance;
import com.yahoo.labs.samoa.instances.InstancesHeader;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import moa.capabilities.CapabilitiesHandler;
import moa.capabilities.Capability;
import moa.capabilities.ImmutableCapabilities;
import moa.core.Example;
import moa.core.InstanceExample;
import moa.core.ObjectRepository;
import moa.options.AbstractOptionHandler;
import moa.tasks.TaskMonitor;
import moa.util.KafkaUtils;
import moa.util.ObjectDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongDeserializer;

/* loaded from: input_file:moa/streams/KafkaStream.class */
public class KafkaStream extends AbstractOptionHandler implements InstanceStream, CapabilitiesHandler, Closeable {
    private static final long serialVersionUID = 671271388371039247L;
    public StringOption topicOption = new StringOption("topic", 't', "Kafka topic to consume", "");
    public StringOption hostOption = new StringOption("host", 'h', "The Kafka broker host", "localhost");
    public StringOption portOption = new StringOption("port", 'p', "The Kafka broker port", "9092");
    protected transient KafkaConsumer<Long, Instance> m_Consumer = null;
    protected transient Queue<Instance> m_InstanceBuffer = null;
    protected transient boolean m_EndOfStreamReached = false;
    protected transient InstancesHeader m_Header = null;

    public String getPurposeString() {
        return "A stream consumed from a Kafka topic.";
    }

    protected void prepareForUseImpl(TaskMonitor taskMonitor, ObjectRepository objectRepository) {
        restart();
    }

    public InstancesHeader getHeader() {
        fillBufferIfNecessary();
        return this.m_Header;
    }

    public long estimatedRemainingInstances() {
        fillBufferIfNecessary();
        if (this.m_EndOfStreamReached) {
            return this.m_InstanceBuffer.size();
        }
        return -1L;
    }

    public boolean hasMoreInstances() {
        fillBufferIfNecessary();
        return (this.m_EndOfStreamReached && bufferIsEmpty()) ? false : true;
    }

    public Example<Instance> nextInstance() {
        fillBufferIfNecessary();
        if (bufferIsEmpty()) {
            return null;
        }
        return new InstanceExample(this.m_InstanceBuffer.remove());
    }

    public boolean isRestartable() {
        return true;
    }

    public void restart() {
        restartConsumer();
        this.m_InstanceBuffer = null;
        this.m_EndOfStreamReached = false;
    }

    public ImmutableCapabilities defineImmutableCapabilities() {
        return new ImmutableCapabilities(new Capability[]{Capability.VIEW_EXPERIMENTAL, Capability.VIEW_STANDARD});
    }

    public void getDescription(StringBuilder sb, int i) {
        while (i > 0) {
            sb.append(" ");
            i--;
        }
        sb.append("Kafka instance stream consuming topic '");
        sb.append(this.topicOption.getValue());
        sb.append("' from broker at ");
        sb.append(broker());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.m_Consumer != null) {
            this.m_Consumer.unsubscribe();
            this.m_Consumer.close();
        }
        this.m_Consumer = null;
    }

    protected void establishConsumer() {
        if (this.m_Consumer != null) {
            return;
        }
        this.m_Consumer = new KafkaConsumer<>(createConsumerConfiguration());
        this.m_Consumer.subscribe(Collections.singletonList(this.topicOption.getValue()));
        restartConsumer();
    }

    protected Map<String, Object> createConsumerConfiguration() {
        HashMap hashMap = new HashMap();
        hashMap.put("key.deserializer", LongDeserializer.class);
        hashMap.put("value.deserializer", ObjectDeserializer.class);
        hashMap.put("bootstrap.servers", broker());
        hashMap.put("fetch.min.bytes", 1);
        hashMap.put("group.id", KafkaUtils.uniqueGroupIDString(this));
        hashMap.put("max.partition.fetch.bytes", 1048576);
        hashMap.put("allow.auto.create.topics", false);
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("enable.auto.commit", true);
        hashMap.put("fetch.max.bytes", 16777216);
        hashMap.put("isolation.level", "read_committed");
        hashMap.put("client.id", getClass().getName());
        return hashMap;
    }

    protected String broker() {
        return KafkaUtils.broker(this.hostOption.getValue(), this.portOption.getValue());
    }

    protected void restartConsumer() {
        if (this.m_Consumer == null) {
            return;
        }
        this.m_Consumer.seekToBeginning(Collections.emptyList());
    }

    protected List<TopicPartition> getPartitions() {
        LinkedList linkedList = new LinkedList();
        if (this.m_Consumer == null) {
            return linkedList;
        }
        for (Map.Entry entry : this.m_Consumer.listTopics().entrySet()) {
            String str = (String) entry.getKey();
            Iterator it = ((List) entry.getValue()).iterator();
            while (it.hasNext()) {
                linkedList.add(new TopicPartition(str, ((PartitionInfo) it.next()).partition()));
            }
        }
        return linkedList;
    }

    protected void fillBufferIfNecessary() {
        if (!this.m_EndOfStreamReached && bufferIsEmpty()) {
            establishConsumer();
            if (this.m_InstanceBuffer == null) {
                this.m_InstanceBuffer = new LinkedList();
            }
            Iterator it = this.m_Consumer.poll(KafkaUtils.WAIT_AS_LONG_AS_POSSIBLE).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (((Instance) consumerRecord.value()) == null) {
                    this.m_EndOfStreamReached = true;
                    close();
                    break;
                }
                this.m_InstanceBuffer.add((Instance) consumerRecord.value());
            }
            cacheHeaderIfNecessary();
        }
    }

    protected void cacheHeaderIfNecessary() {
        Instance peek;
        if (this.m_Header == null && (peek = this.m_InstanceBuffer.peek()) != null) {
            InstancesHeader dataset = peek.dataset();
            if (dataset instanceof InstancesHeader) {
                this.m_Header = dataset;
            } else {
                this.m_Header = new InstancesHeader(dataset);
            }
        }
    }

    protected boolean bufferIsEmpty() {
        return this.m_InstanceBuffer == null || this.m_InstanceBuffer.peek() == null;
    }
}
