package moa.tasks;

import com.github.javacliparser.IntOption;
import com.github.javacliparser.StringOption;
import com.yahoo.labs.samoa.instances.Instance;
import java.util.HashMap;
import java.util.Map;
import moa.capabilities.CapabilitiesHandler;
import moa.capabilities.Capability;
import moa.capabilities.ImmutableCapabilities;
import moa.core.ObjectRepository;
import moa.options.ClassOption;
import moa.streams.InstanceStream;
import moa.util.KafkaUtils;
import moa.util.ObjectSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;

/* loaded from: input_file:moa/tasks/WriteToTopicTask.class */
public class WriteToTopicTask extends AuxiliarMainTask implements CapabilitiesHandler {
    public ClassOption streamOption = new ClassOption("stream", 's', "Stream to write to the topic", InstanceStream.class, "generators.RandomTreeGenerator");
    public IntOption maxInstancesOption = new IntOption("maxInstances", 'm', "Maximum number of instances to write", 100000000, 0, Integer.MAX_VALUE);
    public StringOption topicOption = new StringOption("topic", 't', "Kafka topic to write to", "");
    public StringOption hostOption = new StringOption("host", 'h', "The Kafka broker host", "localhost");
    public StringOption portOption = new StringOption("port", 'p', "The Kafka broker port", "");

    protected Map<String, Object> getProducerConfig(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("key.serializer", LongSerializer.class);
        hashMap.put("value.serializer", ObjectSerializer.class);
        hashMap.put("bootstrap.servers", KafkaUtils.broker(str, str2));
        hashMap.put("fetch.min.bytes", 1);
        hashMap.put("group.id", KafkaUtils.uniqueGroupIDString(this));
        hashMap.put("max.partition.fetch.bytes", 1048576);
        hashMap.put("allow.auto.create.topics", false);
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("enable.auto.commit", true);
        hashMap.put("fetch.max.bytes", 16777216);
        hashMap.put("isolation.level", "read_committed");
        hashMap.put("client.id", getClass().getName());
        return hashMap;
    }

    protected Object doMainTask(TaskMonitor taskMonitor, ObjectRepository objectRepository) {
        InstanceStream instanceStream = (InstanceStream) getPreparedClassOption(this.streamOption);
        int value = this.maxInstancesOption.getValue();
        String value2 = this.topicOption.getValue();
        KafkaProducer kafkaProducer = new KafkaProducer(getProducerConfig(this.hostOption.getValue(), this.portOption.getValue()));
        int i = 0;
        while (i < value && instanceStream.hasMoreInstances()) {
            int i2 = i;
            i++;
            kafkaProducer.send(new ProducerRecord(value2, Long.valueOf(i2), (Instance) instanceStream.nextInstance().getData()));
            if (taskMonitor.isCancelled()) {
                return null;
            }
            taskMonitor.setCurrentActivityFractionComplete(i / (instanceStream.estimatedRemainingInstances() >= 0 ? i + r0 : value));
        }
        kafkaProducer.send(new ProducerRecord(value2, Long.valueOf(i), (Object) null));
        return null;
    }

    public Class<?> getTaskResultType() {
        return null;
    }

    public ImmutableCapabilities defineImmutableCapabilities() {
        return new ImmutableCapabilities(new Capability[]{Capability.VIEW_STANDARD});
    }
}
