package org.apache.tez.dag.app;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/tez/dag/app/TaskCommunicatorManager.class */
public class TaskCommunicatorManager extends AbstractService implements TaskCommunicatorManagerInterface {
    private final AppContext context;
    private final TaskCommunicator[] taskCommunicators;
    private final TaskCommunicatorContext[] taskCommunicatorContexts;
    protected final ServicePluginLifecycleAbstractService[] taskCommunicatorServiceWrappers;
    protected final TaskHeartbeatHandler taskHeartbeatHandler;
    protected final ContainerHeartbeatHandler containerHeartbeatHandler;
    private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE;
    private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts;
    private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers;
    private static final Logger LOG = LoggerFactory.getLogger(TaskCommunicatorManager.class);
    private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/TaskCommunicatorManager$ContainerInfo.class */
    public static final class ContainerInfo {
        TezTaskAttemptID taskAttemptId;

        ContainerInfo(TezTaskAttemptID tezTaskAttemptID) {
            this.taskAttemptId = tezTaskAttemptID;
        }
    }

    public TaskCommunicatorManager(AppContext appContext, TaskHeartbeatHandler taskHeartbeatHandler, ContainerHeartbeatHandler containerHeartbeatHandler, List<NamedEntityDescriptor> list) {
        super(TaskCommunicatorManager.class.getName());
        this.RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
        this.registeredAttempts = new ConcurrentHashMap();
        this.registeredContainers = new ConcurrentHashMap();
        this.context = appContext;
        this.taskHeartbeatHandler = taskHeartbeatHandler;
        this.containerHeartbeatHandler = containerHeartbeatHandler;
        Preconditions.checkArgument((list == null || list.isEmpty()) ? false : true, "TaskCommunicators must be specified");
        this.taskCommunicators = new TaskCommunicator[list.size()];
        this.taskCommunicatorContexts = new TaskCommunicatorContext[list.size()];
        this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[list.size()];
        for (int i = 0; i < list.size(); i++) {
            this.taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(appContext, this, list.get(i).getUserPayload(), i);
            this.taskCommunicators[i] = createTaskCommunicator(list.get(i), i);
            this.taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(this.taskCommunicators[i]);
        }
    }

    public void serviceStart() {
        for (int i = 0; i < this.taskCommunicators.length; i++) {
            this.taskCommunicatorServiceWrappers[i].init(getConfig());
            this.taskCommunicatorServiceWrappers[i].start();
        }
    }

    public void serviceStop() {
        for (int i = 0; i < this.taskCommunicators.length; i++) {
            this.taskCommunicatorServiceWrappers[i].stop();
        }
    }

    @VisibleForTesting
    TaskCommunicator createTaskCommunicator(NamedEntityDescriptor namedEntityDescriptor, int i) {
        return namedEntityDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName()) ? createDefaultTaskCommunicator(this.taskCommunicatorContexts[i]) : namedEntityDescriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName()) ? createUberTaskCommunicator(this.taskCommunicatorContexts[i]) : createCustomTaskCommunicator(this.taskCommunicatorContexts[i], namedEntityDescriptor);
    }

    @VisibleForTesting
    TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
        LOG.info("Using Default Task Communicator");
        return new TezTaskCommunicatorImpl(taskCommunicatorContext);
    }

    @VisibleForTesting
    TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
        LOG.info("Using Default Local Task Communicator");
        return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
    }

    @VisibleForTesting
    TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext, NamedEntityDescriptor namedEntityDescriptor) {
        LOG.info("Using TaskCommunicator {}:{} " + namedEntityDescriptor.getEntityName(), namedEntityDescriptor.getClassName());
        try {
            return (TaskCommunicator) ReflectionUtils.getClazz(namedEntityDescriptor.getClassName()).getConstructor(TaskCommunicatorContext.class).newInstance(taskCommunicatorContext);
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new TezUncheckedException(e);
        }
    }

    public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest taskHeartbeatRequest) throws IOException, TezException {
        ContainerId containerId = ConverterUtils.toContainerId(taskHeartbeatRequest.getContainerIdentifier());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received heartbeat from container, request=" + taskHeartbeatRequest);
        }
        if (!this.registeredContainers.containsKey(containerId)) {
            LOG.warn("Received task heartbeat from unknown container with id: " + containerId + ", asking it to die");
            return this.RESPONSE_SHOULD_DIE;
        }
        pingContainerHeartbeatHandler(containerId);
        TaskAttemptEventInfo taskAttemptEventInfo = new TaskAttemptEventInfo(0, null, 0);
        TezTaskAttemptID taskAttemptId = taskHeartbeatRequest.getTaskAttemptId();
        if (taskAttemptId != null) {
            ContainerId containerId2 = this.registeredAttempts.get(taskAttemptId);
            if (containerId2 == null || !containerId2.equals(containerId)) {
                LOG.info("Attempt: " + taskAttemptId + " is not recognized for heartbeats");
                return this.RESPONSE_SHOULD_DIE;
            }
            List<TezEvent> events = taskHeartbeatRequest.getEvents();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Ping from " + taskAttemptId.toString() + " events: " + (events != null ? events.size() : -1));
            }
            long time = this.context.getClock().getTime();
            ArrayList arrayList = new ArrayList();
            for (TezEvent tezEvent : ListUtils.emptyIfNull(events)) {
                tezEvent.setEventReceivedTime(time);
                if (tezEvent.getEventType() == EventType.TASK_STATUS_UPDATE_EVENT) {
                    this.context.getEventHandler().handle(new TaskAttemptEventStatusUpdate(taskAttemptId, tezEvent.getEvent()));
                } else {
                    arrayList.add(tezEvent);
                }
            }
            if (!arrayList.isEmpty()) {
                this.context.getEventHandler().handle(new VertexEventRouteEvent(taskAttemptId.getTaskID().getVertexID(), Collections.unmodifiableList(arrayList)));
            }
            this.taskHeartbeatHandler.pinged(taskAttemptId);
            taskAttemptEventInfo = this.context.getCurrentDAG().getVertex(taskAttemptId.getTaskID().getVertexID()).getTaskAttemptTezEvents(taskAttemptId, taskHeartbeatRequest.getStartIndex(), taskHeartbeatRequest.getPreRoutedStartIndex(), taskHeartbeatRequest.getMaxEvents());
        }
        return new TaskHeartbeatResponse(false, taskAttemptEventInfo.getEvents(), taskAttemptEventInfo.getNextFromEventId(), taskAttemptEventInfo.getNextPreRoutedFromEventId());
    }

    public void taskAlive(TezTaskAttemptID tezTaskAttemptID) {
        this.taskHeartbeatHandler.pinged(tezTaskAttemptID);
    }

    public void containerAlive(ContainerId containerId) {
        pingContainerHeartbeatHandler(containerId);
    }

    public void taskStartedRemotely(TezTaskAttemptID tezTaskAttemptID, ContainerId containerId) {
        this.context.getEventHandler().handle(new TaskAttemptEventStartedRemotely(tezTaskAttemptID, containerId, null));
        pingContainerHeartbeatHandler(containerId);
    }

    public void taskKilled(TezTaskAttemptID tezTaskAttemptID, TaskAttemptEndReason taskAttemptEndReason, String str) {
        this.context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(tezTaskAttemptID, str, TezUtilsInternal.fromTaskAttemptEndReason(taskAttemptEndReason)));
    }

    public void taskFailed(TezTaskAttemptID tezTaskAttemptID, TaskAttemptEndReason taskAttemptEndReason, String str) {
        this.context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(tezTaskAttemptID, TaskAttemptEventType.TA_FAILED, str, TezUtilsInternal.fromTaskAttemptEndReason(taskAttemptEndReason)));
    }

    public void vertexStateUpdateNotificationReceived(VertexStateUpdate vertexStateUpdate, int i) throws Exception {
        this.taskCommunicators[i].onVertexStateUpdated(vertexStateUpdate);
    }

    public boolean canCommit(TezTaskAttemptID tezTaskAttemptID) throws IOException {
        LOG.info("Commit go/no-go request from " + tezTaskAttemptID.toString());
        this.taskHeartbeatHandler.progressing(tezTaskAttemptID);
        pingContainerHeartbeatHandler(tezTaskAttemptID);
        return this.context.getCurrentDAG().getVertex(tezTaskAttemptID.getTaskID().getVertexID()).getTask(tezTaskAttemptID.getTaskID()).canCommit(tezTaskAttemptID);
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void dagComplete(DAG dag) {
        for (int i = 0; i < this.taskCommunicators.length; i++) {
            ((TaskCommunicatorContextImpl) this.taskCommunicatorContexts[i]).dagCompleteStart(dag);
            this.taskCommunicators[i].dagComplete(dag.getName());
            ((TaskCommunicatorContextImpl) this.taskCommunicatorContexts[i]).dagCompleteEnd();
        }
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void dagSubmitted() {
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void registerRunningContainer(ContainerId containerId, int i) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
        }
        if (this.registeredContainers.put(containerId, NULL_CONTAINER_INFO) != null) {
            throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
        }
        NodeId nodeId = this.context.getAllContainers().get(containerId).getContainer().getNodeId();
        this.taskCommunicators[i].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void unregisterRunningContainer(ContainerId containerId, int i, ContainerEndReason containerEndReason, String str) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
        }
        ContainerInfo remove = this.registeredContainers.remove(containerId);
        if (remove.taskAttemptId != null) {
            this.registeredAttempts.remove(remove.taskAttemptId);
        }
        this.taskCommunicators[i].registerContainerEnd(containerId, containerEndReason, str);
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void registerTaskAttempt(AMContainerTask aMContainerTask, ContainerId containerId, int i) {
        ContainerInfo containerInfo = this.registeredContainers.get(containerId);
        if (containerInfo == null) {
            throw new TezUncheckedException("Registering task attempt: " + aMContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId);
        }
        if (containerInfo.taskAttemptId != null) {
            throw new TezUncheckedException("Registering task attempt: " + aMContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + " with existing assignment to: " + containerInfo.taskAttemptId);
        }
        this.registeredContainers.put(containerId, new ContainerInfo(aMContainerTask.getTask().getTaskAttemptID()));
        ContainerId put = this.registeredAttempts.put(aMContainerTask.getTask().getTaskAttemptID(), containerId);
        if (put != null) {
            throw new TezUncheckedException("Registering task attempt: " + aMContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + " when already assigned to: " + put);
        }
        this.taskCommunicators[i].registerRunningTaskAttempt(containerId, aMContainerTask.getTask(), aMContainerTask.getAdditionalResources(), aMContainerTask.getCredentials(), aMContainerTask.haveCredentialsChanged(), aMContainerTask.getPriority());
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void unregisterTaskAttempt(TezTaskAttemptID tezTaskAttemptID, int i, TaskAttemptEndReason taskAttemptEndReason, String str) {
        ContainerId remove = this.registeredAttempts.remove(tezTaskAttemptID);
        if (remove == null) {
            LOG.warn("Unregister task attempt: " + tezTaskAttemptID + " from unknown container");
        } else if (this.registeredContainers.get(remove) == null) {
            LOG.warn("Unregister task attempt: " + tezTaskAttemptID + " from non-registered container: " + remove);
        } else {
            this.registeredContainers.put(remove, NULL_CONTAINER_INFO);
            this.taskCommunicators[i].unregisterRunningTaskAttempt(tezTaskAttemptID, taskAttemptEndReason, str);
        }
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public TaskCommunicator getTaskCommunicator(int i) {
        return this.taskCommunicators[i];
    }

    private void pingContainerHeartbeatHandler(ContainerId containerId) {
        this.containerHeartbeatHandler.pinged(containerId);
    }

    private void pingContainerHeartbeatHandler(TezTaskAttemptID tezTaskAttemptID) {
        ContainerId containerId = this.registeredAttempts.get(tezTaskAttemptID);
        if (containerId != null) {
            this.containerHeartbeatHandler.pinged(containerId);
        } else {
            LOG.warn("Handling communication from attempt: " + tezTaskAttemptID + ", ContainerId not known for this attempt");
        }
    }
}
