package org.apache.atlas.kafka;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.notification.MessageDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/kafka/KafkaConsumer.class */
public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    private final int consumerId;
    private final ConsumerIterator iterator;
    private final ConsumerConnector consumerConnector;
    private final boolean autoCommitEnabled;
    private long lastSeenOffset;

    public KafkaConsumer(MessageDeserializer<T> messageDeserializer, KafkaStream<String, String> kafkaStream, int i, ConsumerConnector consumerConnector, boolean z) {
        super(messageDeserializer);
        this.consumerConnector = consumerConnector;
        this.lastSeenOffset = 0L;
        this.iterator = kafkaStream.iterator();
        this.consumerId = i;
        this.autoCommitEnabled = z;
    }

    @Override // org.apache.atlas.notification.NotificationConsumer
    public boolean hasNext() {
        return this.iterator.hasNext();
    }

    @Override // org.apache.atlas.notification.AbstractNotificationConsumer
    public String getNext() {
        MessageAndMetadata next = this.iterator.next();
        LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}", new Object[]{Integer.valueOf(this.consumerId), next.topic(), Integer.valueOf(next.partition()), Long.valueOf(next.offset()), next.message()});
        this.lastSeenOffset = next.offset();
        return (String) next.message();
    }

    @Override // org.apache.atlas.notification.AbstractNotificationConsumer
    protected String peekMessage() {
        return (String) ((MessageAndMetadata) this.iterator.peek()).message();
    }

    @Override // org.apache.atlas.notification.AbstractNotificationConsumer, org.apache.atlas.notification.NotificationConsumer
    public void commit() {
        if (this.autoCommitEnabled) {
            LOG.debug("Auto commit is disabled, not committing.");
        } else {
            this.consumerConnector.commitOffsets();
            LOG.debug("Committed offset: {}", Long.valueOf(this.lastSeenOffset));
        }
    }

    @Override // org.apache.atlas.notification.NotificationConsumer
    public void close() {
        this.consumerConnector.shutdown();
    }
}
