package org.apache.tez.dag.app;

import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/tez/dag/app/TestTaskCommunicatorManager.class */
public class TestTaskCommunicatorManager {

    /* loaded from: input_file:org/apache/tez/dag/app/TestTaskCommunicatorManager$ExceptionAnswer.class */
    private static class ExceptionAnswer implements Answer {
        private ExceptionAnswer() {
        }

        public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
            Method method = invocationOnMock.getMethod();
            if (!method.getDeclaringClass().equals(TaskCommunicator.class) || method.getName().equals("getContext") || method.getName().equals("initialize") || method.getName().equals("start") || method.getName().equals("shutdown")) {
                return invocationOnMock.callRealMethod();
            }
            throw new RuntimeException("TestException_" + method.getName());
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/TestTaskCommunicatorManager$FakeTaskComm.class */
    public static class FakeTaskComm extends TaskCommunicator {
        public FakeTaskComm(TaskCommunicatorContext taskCommunicatorContext) {
            super(taskCommunicatorContext);
        }

        public void registerRunningContainer(ContainerId containerId, String str, int i) {
        }

        public void registerContainerEnd(ContainerId containerId, ContainerEndReason containerEndReason, String str) {
        }

        public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, Map<String, LocalResource> map, Credentials credentials, boolean z, int i) {
        }

        public void unregisterRunningTaskAttempt(TezTaskAttemptID tezTaskAttemptID, TaskAttemptEndReason taskAttemptEndReason, String str) {
        }

        public InetSocketAddress getAddress() {
            return null;
        }

        public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        }

        public void dagComplete(int i) {
        }

        public Object getMetaInfo() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/TestTaskCommunicatorManager$TaskCommManagerForMultipleCommTest.class */
    static class TaskCommManagerForMultipleCommTest extends TaskCommunicatorManager {
        private static final AtomicInteger numTaskComms = new AtomicInteger(0);
        private static final Set<Integer> taskCommIndices = new HashSet();
        private static final TaskCommunicator yarnTaskComm = (TaskCommunicator) Mockito.mock(TaskCommunicator.class);
        private static final TaskCommunicator uberTaskComm = (TaskCommunicator) Mockito.mock(TaskCommunicator.class);
        private static final AtomicBoolean yarnTaskCommCreated = new AtomicBoolean(false);
        private static final AtomicBoolean uberTaskCommCreated = new AtomicBoolean(false);
        private static final List<TaskCommunicatorContext> taskCommContexts = new LinkedList();
        private static final List<String> taskCommNames = new LinkedList();
        private static final List<TaskCommunicator> testTaskComms = new LinkedList();

        public static void reset() {
            numTaskComms.set(0);
            taskCommIndices.clear();
            yarnTaskCommCreated.set(false);
            uberTaskCommCreated.set(false);
            taskCommContexts.clear();
            taskCommNames.clear();
            testTaskComms.clear();
        }

        public TaskCommManagerForMultipleCommTest(AppContext appContext, TaskHeartbeatHandler taskHeartbeatHandler, ContainerHeartbeatHandler containerHeartbeatHandler, List<NamedEntityDescriptor> list) throws TezException {
            super(appContext, taskHeartbeatHandler, containerHeartbeatHandler, list);
        }

        TaskCommunicator createTaskCommunicator(NamedEntityDescriptor namedEntityDescriptor, int i) throws TezException {
            numTaskComms.incrementAndGet();
            Assert.assertTrue("Cannot add multiple taskComms with the same index", taskCommIndices.add(Integer.valueOf(i)));
            taskCommNames.add(namedEntityDescriptor.getEntityName());
            return super.createTaskCommunicator(namedEntityDescriptor, i);
        }

        TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
            taskCommContexts.add(taskCommunicatorContext);
            yarnTaskCommCreated.set(true);
            testTaskComms.add(yarnTaskComm);
            return yarnTaskComm;
        }

        TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
            taskCommContexts.add(taskCommunicatorContext);
            uberTaskCommCreated.set(true);
            testTaskComms.add(uberTaskComm);
            return uberTaskComm;
        }

        TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext, NamedEntityDescriptor namedEntityDescriptor) throws TezException {
            taskCommContexts.add(taskCommunicatorContext);
            TaskCommunicator taskCommunicator = (TaskCommunicator) Mockito.spy(super.createCustomTaskCommunicator(taskCommunicatorContext, namedEntityDescriptor));
            testTaskComms.add(taskCommunicator);
            return taskCommunicator;
        }

        public static int getNumTaskComms() {
            return numTaskComms.get();
        }

        public static boolean getYarnTaskCommCreated() {
            return yarnTaskCommCreated.get();
        }

        public static boolean getUberTaskCommCreated() {
            return uberTaskCommCreated.get();
        }

        public static TaskCommunicatorContext getTaskCommContext(int i) {
            return taskCommContexts.get(i);
        }

        public static String getTaskCommName(int i) {
            return taskCommNames.get(i);
        }

        public static TaskCommunicator getTestTaskComm(int i) {
            return testTaskComms.get(i);
        }
    }

    @Before
    @After
    public void reset() {
        TaskCommManagerForMultipleCommTest.reset();
    }

    @Test(timeout = 5000)
    public void testNoTaskCommSpecified() throws IOException, TezException {
        try {
            new TaskCommManagerForMultipleCommTest((AppContext) Mockito.mock(AppContext.class), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), (ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), null);
            Assert.fail("Initialization should have failed without a TaskComm specified");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test(timeout = 5000)
    public void testCustomTaskCommSpecified() throws IOException, TezException {
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        ContainerHeartbeatHandler containerHeartbeatHandler = (ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class);
        LinkedList linkedList = new LinkedList();
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(0, 3);
        linkedList.add(new NamedEntityDescriptor("customTaskComm", FakeTaskComm.class.getName()).setUserPayload(UserPayload.create(allocate)));
        TaskCommManagerForMultipleCommTest taskCommManagerForMultipleCommTest = new TaskCommManagerForMultipleCommTest(appContext, taskHeartbeatHandler, containerHeartbeatHandler, linkedList);
        try {
            taskCommManagerForMultipleCommTest.init(new Configuration(false));
            taskCommManagerForMultipleCommTest.start();
            Assert.assertEquals(1L, TaskCommManagerForMultipleCommTest.getNumTaskComms());
            Assert.assertFalse(TaskCommManagerForMultipleCommTest.getYarnTaskCommCreated());
            Assert.assertFalse(TaskCommManagerForMultipleCommTest.getUberTaskCommCreated());
            Assert.assertEquals("customTaskComm", TaskCommManagerForMultipleCommTest.getTaskCommName(0));
            Assert.assertEquals(allocate, TaskCommManagerForMultipleCommTest.getTaskCommContext(0).getInitialUserPayload().getPayload());
            taskCommManagerForMultipleCommTest.stop();
        } catch (Throwable th) {
            taskCommManagerForMultipleCommTest.stop();
            throw th;
        }
    }

    @Test(timeout = 5000)
    public void testMultipleTaskComms() throws IOException, TezException {
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        ContainerHeartbeatHandler containerHeartbeatHandler = (ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class);
        Configuration configuration = new Configuration(false);
        configuration.set("testkey", "testvalue");
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(configuration);
        LinkedList linkedList = new LinkedList();
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(0, 3);
        linkedList.add(new NamedEntityDescriptor("customTaskComm", FakeTaskComm.class.getName()).setUserPayload(UserPayload.create(allocate)));
        linkedList.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), (String) null).setUserPayload(createUserPayloadFromConf));
        TaskCommManagerForMultipleCommTest taskCommManagerForMultipleCommTest = new TaskCommManagerForMultipleCommTest(appContext, taskHeartbeatHandler, containerHeartbeatHandler, linkedList);
        try {
            taskCommManagerForMultipleCommTest.init(new Configuration(false));
            taskCommManagerForMultipleCommTest.start();
            Assert.assertEquals(2L, TaskCommManagerForMultipleCommTest.getNumTaskComms());
            Assert.assertTrue(TaskCommManagerForMultipleCommTest.getYarnTaskCommCreated());
            Assert.assertFalse(TaskCommManagerForMultipleCommTest.getUberTaskCommCreated());
            Assert.assertEquals("customTaskComm", TaskCommManagerForMultipleCommTest.getTaskCommName(0));
            Assert.assertEquals(allocate, TaskCommManagerForMultipleCommTest.getTaskCommContext(0).getInitialUserPayload().getPayload());
            Assert.assertEquals(TezConstants.getTezYarnServicePluginName(), TaskCommManagerForMultipleCommTest.getTaskCommName(1));
            Assert.assertEquals("testvalue", TezUtils.createConfFromUserPayload(TaskCommManagerForMultipleCommTest.getTaskCommContext(1).getInitialUserPayload()).get("testkey"));
            taskCommManagerForMultipleCommTest.stop();
        } catch (Throwable th) {
            taskCommManagerForMultipleCommTest.stop();
            throw th;
        }
    }

    @Test(timeout = 5000)
    public void testEventRouting() throws Exception {
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when(appContext.getAllContainers().get((ContainerId) Matchers.any(ContainerId.class)).getContainer().getNodeId()).thenReturn(NodeId.newInstance("host1", 3131));
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        ContainerHeartbeatHandler containerHeartbeatHandler = (ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class);
        Configuration configuration = new Configuration(false);
        configuration.set("testkey", "testvalue");
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(configuration);
        LinkedList linkedList = new LinkedList();
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(0, 3);
        linkedList.add(new NamedEntityDescriptor("customTaskComm", FakeTaskComm.class.getName()).setUserPayload(UserPayload.create(allocate)));
        linkedList.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), (String) null).setUserPayload(createUserPayloadFromConf));
        TaskCommManagerForMultipleCommTest taskCommManagerForMultipleCommTest = new TaskCommManagerForMultipleCommTest(appContext, taskHeartbeatHandler, containerHeartbeatHandler, linkedList);
        try {
            taskCommManagerForMultipleCommTest.init(new Configuration(false));
            taskCommManagerForMultipleCommTest.start();
            Assert.assertEquals(2L, TaskCommManagerForMultipleCommTest.getNumTaskComms());
            Assert.assertTrue(TaskCommManagerForMultipleCommTest.getYarnTaskCommCreated());
            Assert.assertFalse(TaskCommManagerForMultipleCommTest.getUberTaskCommCreated());
            ((TaskCommunicator) Mockito.verify(TaskCommManagerForMultipleCommTest.getTestTaskComm(0))).initialize();
            ((TaskCommunicator) Mockito.verify(TaskCommManagerForMultipleCommTest.getTestTaskComm(0))).start();
            ((TaskCommunicator) Mockito.verify(TaskCommManagerForMultipleCommTest.getTestTaskComm(1))).initialize();
            ((TaskCommunicator) Mockito.verify(TaskCommManagerForMultipleCommTest.getTestTaskComm(1))).start();
            ContainerId containerId = (ContainerId) Mockito.mock(ContainerId.class);
            taskCommManagerForMultipleCommTest.registerRunningContainer(containerId, 0);
            ((TaskCommunicator) Mockito.verify(TaskCommManagerForMultipleCommTest.getTestTaskComm(0))).registerRunningContainer((ContainerId) Matchers.eq(containerId), (String) Matchers.eq("host1"), Matchers.eq(3131));
            ContainerId containerId2 = (ContainerId) Mockito.mock(ContainerId.class);
            taskCommManagerForMultipleCommTest.registerRunningContainer(containerId2, 1);
            ((TaskCommunicator) Mockito.verify(TaskCommManagerForMultipleCommTest.getTestTaskComm(1))).registerRunningContainer((ContainerId) Matchers.eq(containerId2), (String) Matchers.eq("host1"), Matchers.eq(3131));
            taskCommManagerForMultipleCommTest.stop();
            ((TaskCommunicator) Mockito.verify(taskCommManagerForMultipleCommTest.getTaskCommunicator(0).getTaskCommunicator())).shutdown();
            ((TaskCommunicator) Mockito.verify(taskCommManagerForMultipleCommTest.getTaskCommunicator(1).getTaskCommunicator())).shutdown();
        } catch (Throwable th) {
            taskCommManagerForMultipleCommTest.stop();
            ((TaskCommunicator) Mockito.verify(taskCommManagerForMultipleCommTest.getTaskCommunicator(0).getTaskCommunicator())).shutdown();
            ((TaskCommunicator) Mockito.verify(taskCommManagerForMultipleCommTest.getTaskCommunicator(1).getTaskCommunicator())).shutdown();
            throw th;
        }
    }

    @Test(timeout = 5000)
    public void testTaskCommunicatorUserError() {
        TaskCommunicatorContextImpl taskCommunicatorContextImpl = (TaskCommunicatorContextImpl) Mockito.mock(TaskCommunicatorContextImpl.class);
        TaskCommunicator taskCommunicator = (TaskCommunicator) Mockito.mock(TaskCommunicator.class, new ExceptionAnswer());
        ((TaskCommunicator) Mockito.doReturn(taskCommunicatorContextImpl).when(taskCommunicator)).getContext();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when(appContext.getEventHandler()).thenReturn(eventHandler);
        ((AppContext) Mockito.doReturn("testTaskCommunicator").when(appContext)).getTaskCommunicatorName(0);
        Configuration configuration = new Configuration(false);
        TaskCommunicatorManager taskCommunicatorManager = new TaskCommunicatorManager(taskCommunicator, appContext, (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), (ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class));
        try {
            taskCommunicatorManager.init(configuration);
            taskCommunicatorManager.start();
            DAG dag = (DAG) Mockito.mock(DAG.class, Mockito.RETURNS_DEEP_STUBS);
            Mockito.when(Integer.valueOf(dag.getID().getId())).thenReturn(1);
            taskCommunicatorManager.dagComplete(dag);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
            ((EventHandler) Mockito.verify(eventHandler, Mockito.times(1))).handle((Event) forClass.capture());
            DAGAppMasterEventUserServiceFatalError dAGAppMasterEventUserServiceFatalError = (Event) forClass.getValue();
            Assert.assertTrue(dAGAppMasterEventUserServiceFatalError instanceof DAGAppMasterEventUserServiceFatalError);
            DAGAppMasterEventUserServiceFatalError dAGAppMasterEventUserServiceFatalError2 = dAGAppMasterEventUserServiceFatalError;
            Assert.assertEquals(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, dAGAppMasterEventUserServiceFatalError2.getType());
            Assert.assertTrue(dAGAppMasterEventUserServiceFatalError2.getError().getMessage().contains("TestException_dagComplete"));
            Assert.assertTrue(dAGAppMasterEventUserServiceFatalError2.getDiagnosticInfo().contains("DAG completion"));
            Assert.assertTrue(dAGAppMasterEventUserServiceFatalError2.getDiagnosticInfo().contains("[0:testTaskCommunicator]"));
            Mockito.when(appContext.getAllContainers().get((ContainerId) Matchers.any(ContainerId.class)).getContainer().getNodeId()).thenReturn(Mockito.mock(NodeId.class));
            taskCommunicatorManager.registerRunningContainer((ContainerId) Mockito.mock(ContainerId.class), 0);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
            ((EventHandler) Mockito.verify(eventHandler, Mockito.times(2))).handle((Event) forClass2.capture());
            DAGAppMasterEventUserServiceFatalError dAGAppMasterEventUserServiceFatalError3 = (Event) forClass2.getAllValues().get(1);
            Assert.assertTrue(dAGAppMasterEventUserServiceFatalError3 instanceof DAGAppMasterEventUserServiceFatalError);
            DAGAppMasterEventUserServiceFatalError dAGAppMasterEventUserServiceFatalError4 = dAGAppMasterEventUserServiceFatalError3;
            Assert.assertEquals(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, dAGAppMasterEventUserServiceFatalError4.getType());
            Assert.assertTrue(dAGAppMasterEventUserServiceFatalError4.getError().getMessage().contains("TestException_registerRunningContainer"));
            Assert.assertTrue(dAGAppMasterEventUserServiceFatalError4.getDiagnosticInfo().contains("registering running Container"));
            Assert.assertTrue(dAGAppMasterEventUserServiceFatalError4.getDiagnosticInfo().contains("[0:testTaskCommunicator]"));
            taskCommunicatorManager.stop();
        } catch (Throwable th) {
            taskCommunicatorManager.stop();
            throw th;
        }
    }
}
