package kieker.analysis.plugin.reader.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.reader.newio.AbstractRawDataReader;
import kieker.common.configuration.Configuration;
import kieker.common.record.IMonitoringRecord;
import kieker.common.util.filesystem.FSUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager;

@Plugin(description = "A plugin that reads monitoring records from a Kafka topic", outputPorts = {@OutputPort(name = "monitoringRecords", eventTypes = {IMonitoringRecord.class}, description = "Output port of the Kafka reader")}, configuration = {@Property(name = KafkaReader.CONFIG_PROPERTY_TOPIC_NAME, defaultValue = "kiekerRecords", description = "Name of the Kafka topic to read the records from"), @Property(name = KafkaReader.CONFIG_PROPERTY_BOOTSTRAP_SERVERS, defaultValue = "localhost:9092", description = "Bootstrap servers for the Kafka cluster"), @Property(name = KafkaReader.CONFIG_PROPERTY_GROUP_ID, defaultValue = FSUtil.FILE_PREFIX, description = "Group ID for the Kafka consumer group"), @Property(name = KafkaReader.CONFIG_PROPERTY_AUTO_COMMIT, defaultValue = "true", description = "Auto-commit the current position?"), @Property(name = KafkaReader.CONFIG_PROPERTY_AUTO_COMMIT_INTERVAL_MS, defaultValue = "1000", description = "Auto commit interval in milliseconds"), @Property(name = KafkaReader.CONFIG_PROPERTY_SESSION_TIMEOUT_MS, defaultValue = KafkaManager.DEFAULT_TIMEOUT_MILLIS, description = "Session timeout interval in milliseconds")})
/* loaded from: input_file:kieker/analysis/plugin/reader/kafka/KafkaReader.class */
public class KafkaReader extends AbstractRawDataReader {
    public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
    public static final String CONFIG_PROPERTY_DESERIALIZER = "deserializer";
    public static final String CONFIG_PROPERTY_TOPIC_NAME = "topicName";
    public static final String CONFIG_PROPERTY_BOOTSTRAP_SERVERS = "bootstrapServers";
    public static final String CONFIG_PROPERTY_GROUP_ID = "groupId";
    public static final String CONFIG_PROPERTY_AUTO_COMMIT = "autoCommit";
    public static final String CONFIG_PROPERTY_AUTO_COMMIT_INTERVAL_MS = "autoCommitIntervalMs";
    public static final String CONFIG_PROPERTY_SESSION_TIMEOUT_MS = "sessionTimeoutMs";
    private final String topicName;
    private final String bootstrapServers;
    private final String groupId;
    private final boolean enableAutoCommit;
    private final int autoCommitIntervalMs;
    private final int sessionTimeoutMs;
    private KafkaConsumer<String, byte[]> consumer;
    private volatile boolean terminated;

    public KafkaReader(Configuration configuration, IProjectContext iProjectContext) {
        super(configuration, iProjectContext, configuration.getStringProperty("deserializer"));
        this.terminated = false;
        this.topicName = configuration.getStringProperty(CONFIG_PROPERTY_TOPIC_NAME);
        this.bootstrapServers = configuration.getStringProperty(CONFIG_PROPERTY_BOOTSTRAP_SERVERS);
        this.groupId = configuration.getStringProperty(CONFIG_PROPERTY_GROUP_ID);
        this.enableAutoCommit = configuration.getBooleanProperty(CONFIG_PROPERTY_AUTO_COMMIT);
        this.autoCommitIntervalMs = configuration.getIntProperty(CONFIG_PROPERTY_AUTO_COMMIT_INTERVAL_MS);
        this.sessionTimeoutMs = configuration.getIntProperty(CONFIG_PROPERTY_SESSION_TIMEOUT_MS);
    }

    @Override // kieker.analysis.analysisComponent.AbstractAnalysisComponent, kieker.analysis.analysisComponent.IAnalysisComponent
    public Configuration getCurrentConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setProperty(CONFIG_PROPERTY_TOPIC_NAME, this.topicName);
        configuration.setProperty(CONFIG_PROPERTY_BOOTSTRAP_SERVERS, this.bootstrapServers);
        if (this.groupId != null) {
            configuration.setProperty(CONFIG_PROPERTY_GROUP_ID, this.groupId);
        }
        configuration.setProperty(CONFIG_PROPERTY_AUTO_COMMIT, this.enableAutoCommit);
        configuration.setProperty(CONFIG_PROPERTY_AUTO_COMMIT_INTERVAL_MS, this.autoCommitIntervalMs);
        configuration.setProperty(CONFIG_PROPERTY_SESSION_TIMEOUT_MS, this.sessionTimeoutMs);
        return configuration;
    }

    @Override // kieker.analysis.plugin.reader.AbstractReaderPlugin, kieker.analysis.plugin.IPlugin
    public boolean init() {
        if (!super.init()) {
            return false;
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("group.id", this.groupId);
        properties.put("enable.auto.commit", Boolean.valueOf(this.enableAutoCommit));
        properties.put("auto.commit.interval.ms", Integer.valueOf(this.autoCommitIntervalMs));
        properties.put("session.timeout.ms", Integer.valueOf(this.sessionTimeoutMs));
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        this.consumer = new KafkaConsumer<>(properties);
        return true;
    }

    @Override // kieker.analysis.plugin.reader.IReaderPlugin
    public boolean read() {
        this.consumer.subscribe(Arrays.asList(this.topicName));
        while (!this.terminated) {
            try {
                processRecords(this.consumer.poll(100L));
            } finally {
                this.consumer.close();
            }
        }
        return true;
    }

    private void processRecords(ConsumerRecords<String, byte[]> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            decodeAndDeliverRecords((byte[]) ((ConsumerRecord) it.next()).value(), "monitoringRecords");
        }
    }

    @Override // kieker.analysis.plugin.IPlugin
    public void terminate(boolean z) {
        this.terminated = true;
    }
}
