package org.apache.atlas.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.VerifiableProperties;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

@Singleton
/* loaded from: input_file:org/apache/atlas/kafka/KafkaNotification.class */
public class KafkaNotification extends AbstractNotification implements Service {
    public static final String PROPERTY_PREFIX = "atlas.kafka";
    private static final String ATLAS_KAFKA_DATA = "data";
    public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
    public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
    protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
    private KafkaServer kafkaServer;
    private ServerCnxnFactory factory;
    private Properties properties;
    private KafkaProducer producer;
    private List<ConsumerConnector> consumerConnectors;
    public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
    private static final Map<NotificationInterface.NotificationType, String> TOPIC_MAP = new HashMap<NotificationInterface.NotificationType, String>() { // from class: org.apache.atlas.kafka.KafkaNotification.1
        {
            put(NotificationInterface.NotificationType.HOOK, KafkaNotification.ATLAS_HOOK_TOPIC);
            put(NotificationInterface.NotificationType.ENTITIES, KafkaNotification.ATLAS_ENTITIES_TOPIC);
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/atlas/kafka/KafkaNotification$MessageContext.class */
    public class MessageContext {
        private final Future<RecordMetadata> future;
        private final String message;

        public MessageContext(Future<RecordMetadata> future, String str) {
            this.future = future;
            this.message = str;
        }

        public Future<RecordMetadata> getFuture() {
            return this.future;
        }

        public String getMessage() {
            return this.message;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/atlas/kafka/KafkaNotification$SystemTime.class */
    public static class SystemTime implements Time {
        private SystemTime() {
        }

        public long milliseconds() {
            return System.currentTimeMillis();
        }

        public long nanoseconds() {
            return System.nanoTime();
        }

        public void sleep(long j) {
            try {
                Thread.sleep(j);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @VisibleForTesting
    String getTopicName(NotificationInterface.NotificationType notificationType) {
        return TOPIC_MAP.get(notificationType);
    }

    public KafkaNotification(Configuration configuration) throws AtlasException {
        super(configuration);
        this.producer = null;
        this.consumerConnectors = new ArrayList();
        this.properties = ConfigurationConverter.getProperties(ApplicationProperties.getSubsetConfiguration(configuration, PROPERTY_PREFIX));
        this.properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.properties.put("partition.assignment.strategy", "roundrobin");
        this.properties.put("auto.offset.reset", "smallest");
    }

    @VisibleForTesting
    protected KafkaNotification(Properties properties) {
        this.producer = null;
        this.consumerConnectors = new ArrayList();
        this.properties = properties;
    }

    public void start() throws AtlasException {
        if (isHAEnabled()) {
            LOG.info("Not starting embedded instances when HA is enabled.");
        } else if (isEmbedded()) {
            try {
                startZk();
                startKafka();
            } catch (Exception e) {
                throw new AtlasException("Failed to start embedded kafka", e);
            }
        }
    }

    public void stop() {
        if (this.kafkaServer != null) {
            this.kafkaServer.shutdown();
        }
        if (this.factory != null) {
            this.factory.shutdown();
        }
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public <T> List<NotificationConsumer<T>> createConsumers(NotificationInterface.NotificationType notificationType, int i) {
        return createConsumers(notificationType, i, Boolean.valueOf(this.properties.getProperty("auto.commit.enable", "true")).booleanValue());
    }

    @VisibleForTesting
    public <T> List<NotificationConsumer<T>> createConsumers(NotificationInterface.NotificationType notificationType, int i, boolean z) {
        String str = TOPIC_MAP.get(notificationType);
        Properties consumerProperties = getConsumerProperties(notificationType);
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            ConsumerConnector createConsumerConnector = createConsumerConnector(consumerProperties);
            HashMap hashMap = new HashMap();
            hashMap.put(str, 1);
            StringDecoder stringDecoder = new StringDecoder((VerifiableProperties) null);
            Iterator it = ((List) createConsumerConnector.createMessageStreams(hashMap, stringDecoder, stringDecoder).get(str)).iterator();
            while (it.hasNext()) {
                arrayList.add(createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(), (KafkaStream) it.next(), i2, createConsumerConnector, z));
            }
            this.consumerConnectors.add(createConsumerConnector);
        }
        return arrayList;
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public void close() {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
        Iterator<ConsumerConnector> it = this.consumerConnectors.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.consumerConnectors.clear();
    }

    @Override // org.apache.atlas.notification.AbstractNotification
    public void sendInternal(NotificationInterface.NotificationType notificationType, String... strArr) throws NotificationException {
        if (this.producer == null) {
            createProducer();
        }
        sendInternalToProducer(this.producer, notificationType, strArr);
    }

    @VisibleForTesting
    void sendInternalToProducer(Producer producer, NotificationInterface.NotificationType notificationType, String[] strArr) throws NotificationException {
        String str = TOPIC_MAP.get(notificationType);
        ArrayList<MessageContext> arrayList = new ArrayList();
        for (String str2 : strArr) {
            ProducerRecord producerRecord = new ProducerRecord(str, str2);
            LOG.debug("Sending message for topic {}: {}", str, str2);
            arrayList.add(new MessageContext(producer.send(producerRecord), str2));
        }
        ArrayList arrayList2 = new ArrayList();
        Exception exc = null;
        for (MessageContext messageContext : arrayList) {
            try {
                RecordMetadata recordMetadata = messageContext.getFuture().get();
                LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
            } catch (Exception e) {
                exc = e;
                arrayList2.add(messageContext.getMessage());
            }
        }
        if (exc != null) {
            throw new NotificationException(exc, arrayList2);
        }
    }

    protected ConsumerConnector createConsumerConnector(Properties properties) {
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    protected <T> KafkaConsumer<T> createKafkaConsumer(Class<T> cls, MessageDeserializer<T> messageDeserializer, KafkaStream kafkaStream, int i, ConsumerConnector consumerConnector, boolean z) {
        return new KafkaConsumer<>(messageDeserializer, kafkaStream, i, consumerConnector, z);
    }

    private Properties getConsumerProperties(NotificationInterface.NotificationType notificationType) {
        String property = this.properties.getProperty(notificationType.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
        if (property == null) {
            throw new IllegalStateException("No configuration group id set for the notification type " + notificationType);
        }
        Properties properties = new Properties();
        properties.putAll(this.properties);
        properties.put(CONSUMER_GROUP_ID_PROPERTY, property);
        LOG.info("Consumer property: auto.commit.enable: {}", properties.getProperty("auto.commit.enable"));
        return properties;
    }

    private File constructDir(String str) {
        File file = new File(this.properties.getProperty(ATLAS_KAFKA_DATA), str);
        if (file.exists() || file.mkdirs()) {
            return file;
        }
        throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
    }

    private synchronized void createProducer() {
        if (this.producer == null) {
            this.producer = new KafkaProducer(this.properties);
        }
    }

    private URL getURL(String str) throws MalformedURLException {
        try {
            return new URL(str);
        } catch (MalformedURLException e) {
            return new URL("http://" + str);
        }
    }

    private String startZk() throws IOException, InterruptedException, URISyntaxException {
        String property = this.properties.getProperty("zookeeper.connect");
        LOG.debug("Starting zookeeper at {}", property);
        URL url = getURL(property);
        this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress(url.getHost(), url.getPort()), 1024);
        this.factory.startup(new ZooKeeperServer(constructDir("zk/txn"), constructDir("zk/snap"), 500));
        return this.factory.getLocalAddress().getAddress().toString();
    }

    private void startKafka() throws IOException, URISyntaxException {
        String property = this.properties.getProperty("bootstrap.servers");
        LOG.debug("Starting kafka at {}", property);
        URL url = getURL(property);
        Properties properties = this.properties;
        properties.setProperty("broker.id", "1");
        properties.setProperty("host.name", url.getHost());
        properties.setProperty("port", String.valueOf(url.getPort()));
        properties.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
        properties.setProperty("log.flush.interval.messages", String.valueOf(1));
        this.kafkaServer = new KafkaServer(KafkaConfig.fromProps(properties), new SystemTime(), Option.apply(getClass().getName()));
        this.kafkaServer.startup();
        LOG.debug("Embedded kafka server started with broker config {}", properties);
    }
}
