package org.apache.atlas.notification;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer.class */
public class NotificationHookConsumer implements Service {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
    public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
    public static final int SERVER_READY_WAIT_TIME_MS = 1000;

    @Inject
    private NotificationInterface notificationInterface;
    private ExecutorService executors;
    private AtlasClient atlasClient;

    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$HookConsumer.class */
    class HookConsumer implements Runnable {
        private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
        private final AtlasClient client;

        public HookConsumer(NotificationHookConsumer notificationHookConsumer, NotificationConsumer<HookNotification.HookNotificationMessage> notificationConsumer) {
            this(notificationHookConsumer.atlasClient, notificationConsumer);
        }

        public HookConsumer(AtlasClient atlasClient, NotificationConsumer<HookNotification.HookNotificationMessage> notificationConsumer) {
            this.client = atlasClient;
            this.consumer = notificationConsumer;
        }

        private boolean hasNext() {
            try {
                return this.consumer.hasNext();
            } catch (ConsumerTimeoutException e) {
                return false;
            }
        }

        /* JADX WARN: Failed to build post-dominance tree
        java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 5
        	at jadx.core.dex.visitors.blocks.DominatorTree.build(DominatorTree.java:68)
        	at jadx.core.dex.visitors.blocks.PostDominatorTree.compute(PostDominatorTree.java:32)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.processBlocksTree(BlockProcessor.java:73)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.visit(BlockProcessor.java:44)
         */
        /* JADX WARN: Failed to find 'out' block for switch in B:11:0x002e. Please report as an issue. */
        @Override // java.lang.Runnable
        public void run() {
            if (!serverAvailable(new Timer())) {
                return;
            }
            while (true) {
                try {
                    if (hasNext()) {
                        HookNotification.HookNotificationMessage next = this.consumer.next();
                        try {
                            switch (next.getType()) {
                                case ENTITY_CREATE:
                                    NotificationHookConsumer.this.atlasClient.createEntity(((HookNotification.EntityCreateRequest) next).getEntities());
                                    break;
                                case ENTITY_PARTIAL_UPDATE:
                                    HookNotification.EntityPartialUpdateRequest entityPartialUpdateRequest = (HookNotification.EntityPartialUpdateRequest) next;
                                    NotificationHookConsumer.this.atlasClient.updateEntity(entityPartialUpdateRequest.getTypeName(), entityPartialUpdateRequest.getAttribute(), entityPartialUpdateRequest.getAttributeValue(), entityPartialUpdateRequest.getEntity());
                                    break;
                                case ENTITY_FULL_UPDATE:
                                    NotificationHookConsumer.this.atlasClient.updateEntities(((HookNotification.EntityUpdateRequest) next).getEntities());
                                    break;
                            }
                        } catch (Exception e) {
                            NotificationHookConsumer.LOG.warn("Error handling message {}", next, e);
                        }
                    }
                } catch (Throwable th) {
                    NotificationHookConsumer.LOG.warn("Failure in NotificationHookConsumer", th);
                }
            }
        }

        boolean serverAvailable(Timer timer) {
            while (!this.client.isServerReady()) {
                try {
                    try {
                        NotificationHookConsumer.LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", Integer.valueOf(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS));
                        timer.sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
                    } catch (InterruptedException e) {
                        NotificationHookConsumer.LOG.info("Interrupted while waiting for Atlas Server to become ready, exiting consumer thread.", e);
                        return false;
                    }
                } catch (Throwable th) {
                    NotificationHookConsumer.LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", th);
                    return false;
                }
            }
            NotificationHookConsumer.LOG.info("Atlas Server is ready, can start reading Kafka events.");
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$Timer.class */
    public static class Timer {
        Timer() {
        }

        public void sleep(int i) throws InterruptedException {
            Thread.sleep(i);
        }
    }

    public void start() throws AtlasException {
        Configuration configuration = ApplicationProperties.get();
        this.atlasClient = new AtlasClient(configuration.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"));
        List createConsumers = this.notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, configuration.getInt(CONSUMER_THREADS_PROPERTY, 1));
        this.executors = Executors.newFixedThreadPool(createConsumers.size());
        Iterator it = createConsumers.iterator();
        while (it.hasNext()) {
            this.executors.submit(new HookConsumer(this, (NotificationConsumer) it.next()));
        }
    }

    public void stop() {
        this.notificationInterface.close();
        try {
            if (this.executors != null && !this.executors.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            LOG.error("Failure in shutting down consumers");
        }
    }
}
