package org.apache.tez.dag.app.rm;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.rm.TaskSchedulerService;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeMap;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/tez/dag/app/rm/TestContainerReuse.class */
public class TestContainerReuse {
    private static final Log LOG = LogFactory.getLog(TestContainerReuse.class);

    /* loaded from: input_file:org/apache/tez/dag/app/rm/TestContainerReuse$ChangingDAGIDAnswer.class */
    private static class ChangingDAGIDAnswer implements Answer<TezDAGID> {
        private TezDAGID dagID;

        public ChangingDAGIDAnswer(TezDAGID tezDAGID) {
            this.dagID = tezDAGID;
        }

        public void setDAGID(TezDAGID tezDAGID) {
            this.dagID = tezDAGID;
        }

        /* renamed from: answer, reason: merged with bridge method [inline-methods] */
        public TezDAGID m13answer(InvocationOnMock invocationOnMock) throws Throwable {
            return this.dagID;
        }
    }

    @Test(timeout = 15000)
    public void testDelayedReuseContainerBecomesAvailable() throws IOException, InterruptedException, ExecutionException {
        Throwable th;
        Configuration configuration = new Configuration(new YarnConfiguration());
        configuration.setBoolean("tez.am.container.reuse.enabled", true);
        configuration.setBoolean("tez.am.container.reuse.rack-fallback.enabled", false);
        configuration.setBoolean("tez.am.container.reuse.non-local-fallback.enabled", false);
        configuration.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 3000L);
        configuration.setLong("tez.am.container.idle.release-timeout-min.millis", 0L);
        RackResolver.init(configuration);
        TaskSchedulerService.TaskSchedulerAppCallback taskSchedulerAppCallback = (TaskSchedulerService.TaskSchedulerAppCallback) Mockito.mock(TaskSchedulerService.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler capturingEventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID tezDAGID = TezDAGID.getInstance("0", 0, 0);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 1);
        TezAMRMClientAsync tezAMRMClientAsync = (TezAMRMClientAsync) Mockito.spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(new TestTaskSchedulerHelpers.AMRMClientForTest(), 100));
        ((TaskSchedulerService.TaskSchedulerAppCallback) Mockito.doReturn(new TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "success", "url")).when(taskSchedulerAppCallback)).getFinalAppStatus();
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener) Mockito.mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
        AMNodeMap aMNodeMap = new AMNodeMap(capturingEventHandler, appContext);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(appContext)).getAllContainers();
        ((AppContext) Mockito.doReturn(DAGAppMasterState.RUNNING).when(appContext)).getAMState();
        ((AppContext) Mockito.doReturn(aMNodeMap).when(appContext)).getAllNodes();
        ((AppContext) Mockito.doReturn(tezDAGID).when(appContext)).getCurrentDAGID();
        ((AppContext) Mockito.doReturn(Mockito.mock(ClusterInfo.class)).when(appContext)).getClusterInfo();
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler) Mockito.spy(new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, capturingEventHandler, tezAMRMClientAsync, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher()));
        taskSchedulerEventHandler.init(configuration);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback spyTaskScheduler = ((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = spyTaskScheduler.getDrainableAppCallback();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        spyTaskScheduler.delayedContainerManager.drainedDelayedContainersForTest = atomicBoolean;
        Resource newInstance = Resource.newInstance(1024, 1);
        Priority newInstance2 = Priority.newInstance(5);
        String[] strArr = {"host1"};
        String[] strArr2 = {"host2"};
        String[] strArr3 = {"/default-rack"};
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 1), 1);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 2), 1);
        TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 3), 1);
        TaskAttempt taskAttempt = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        TaskAttempt taskAttempt2 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        TaskAttempt taskAttempt3 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent = createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, newInstance, strArr, strArr3, newInstance2);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent2 = createLaunchRequestEvent(tezTaskAttemptID2, taskAttempt2, newInstance, strArr2, strArr3, newInstance2);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent3 = createLaunchRequestEvent(tezTaskAttemptID3, taskAttempt3, newInstance, strArr, strArr3, newInstance2);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent2);
        Container createContainer = createContainer(1, strArr[0], newInstance, newInstance2);
        Container createContainer2 = createContainer(2, strArr2[0], newInstance, newInstance2);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Lists.newArrayList(new Container[]{createContainer, createContainer2}));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt2), Matchers.any(Object.class), (Container) Matchers.eq(createContainer2));
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent3);
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler, Mockito.times(1))).taskAllocated(Matchers.eq(taskAttempt3), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        capturingEventHandler.reset();
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt2, createContainer2.getId(), TaskAttemptState.SUCCEEDED));
        long currentTimeMillis = System.currentTimeMillis();
        Throwable th2 = null;
        while (true) {
            th = th2;
            if (System.currentTimeMillis() >= currentTimeMillis + 5000) {
                break;
            }
            try {
                ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler, Mockito.times(1))).containerBeingReleased((ContainerId) Matchers.eq(createContainer2.getId()));
                th = null;
                break;
            } catch (Throwable th3) {
                th2 = th3;
            }
        }
        Assert.assertTrue("containerHost2 was not released", th == null);
        spyTaskScheduler.stop();
        spyTaskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout = 15000)
    public void testDelayedReuseContainerNotAvailable() throws IOException, InterruptedException, ExecutionException {
        Configuration configuration = new Configuration(new YarnConfiguration());
        configuration.setBoolean("tez.am.container.reuse.enabled", true);
        configuration.setBoolean("tez.am.container.reuse.rack-fallback.enabled", false);
        configuration.setBoolean("tez.am.container.reuse.non-local-fallback.enabled", false);
        configuration.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 1000L);
        configuration.setLong("tez.am.container.idle.release-timeout-min.millis", 0L);
        RackResolver.init(configuration);
        TaskSchedulerService.TaskSchedulerAppCallback taskSchedulerAppCallback = (TaskSchedulerService.TaskSchedulerAppCallback) Mockito.mock(TaskSchedulerService.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler capturingEventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID tezDAGID = TezDAGID.getInstance("0", 0, 0);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 1);
        TezAMRMClientAsync tezAMRMClientAsync = (TezAMRMClientAsync) Mockito.spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(new TestTaskSchedulerHelpers.AMRMClientForTest(), 100));
        ((TaskSchedulerService.TaskSchedulerAppCallback) Mockito.doReturn(new TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "success", "url")).when(taskSchedulerAppCallback)).getFinalAppStatus();
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener) Mockito.mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
        AMNodeMap aMNodeMap = new AMNodeMap(capturingEventHandler, appContext);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(appContext)).getAllContainers();
        ((AppContext) Mockito.doReturn(aMNodeMap).when(appContext)).getAllNodes();
        ((AppContext) Mockito.doReturn(DAGAppMasterState.RUNNING).when(appContext)).getAMState();
        ((AppContext) Mockito.doReturn(tezDAGID).when(appContext)).getCurrentDAGID();
        ((AppContext) Mockito.doReturn(Mockito.mock(ClusterInfo.class)).when(appContext)).getClusterInfo();
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler) Mockito.spy(new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, capturingEventHandler, tezAMRMClientAsync, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher()));
        taskSchedulerEventHandler.init(configuration);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback spyTaskScheduler = ((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = spyTaskScheduler.getDrainableAppCallback();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        spyTaskScheduler.delayedContainerManager.drainedDelayedContainersForTest = atomicBoolean;
        Resource newInstance = Resource.newInstance(1024, 1);
        Priority newInstance2 = Priority.newInstance(5);
        String[] strArr = {"host1"};
        String[] strArr2 = {"host2"};
        String[] strArr3 = {"/default-rack"};
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 1), 1);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 2), 1);
        TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 3), 1);
        TaskAttempt taskAttempt = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        TaskAttempt taskAttempt2 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        TaskAttempt taskAttempt3 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent = createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, newInstance, strArr, strArr3, newInstance2);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent2 = createLaunchRequestEvent(tezTaskAttemptID2, taskAttempt2, newInstance, strArr2, strArr3, newInstance2);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent3 = createLaunchRequestEvent(tezTaskAttemptID3, taskAttempt3, newInstance, strArr, strArr3, newInstance2);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent2);
        Container createContainer = createContainer(1, strArr[0], newInstance, newInstance2);
        Container createContainer2 = createContainer(2, strArr2[0], newInstance, newInstance2);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Lists.newArrayList(new Container[]{createContainer, createContainer2}));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt2), Matchers.any(Object.class), (Container) Matchers.eq(createContainer2));
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent3);
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt2, createContainer2.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt2), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler, Mockito.times(0))).taskAllocated(Matchers.eq(taskAttempt3), Matchers.any(Object.class), (Container) Matchers.eq(createContainer2));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(1))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer2.getId()));
        capturingEventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        spyTaskScheduler.stop();
        spyTaskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout = 10000)
    public void testSimpleReuse() throws IOException, InterruptedException, ExecutionException {
        Configuration configuration = new Configuration(new YarnConfiguration());
        configuration.setBoolean("tez.am.container.reuse.enabled", true);
        configuration.setBoolean("tez.am.container.reuse.rack-fallback.enabled", true);
        configuration.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 0L);
        configuration.setLong("tez.am.container.idle.release-timeout-min.millis", 0L);
        RackResolver.init(configuration);
        TaskSchedulerService.TaskSchedulerAppCallback taskSchedulerAppCallback = (TaskSchedulerService.TaskSchedulerAppCallback) Mockito.mock(TaskSchedulerService.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler capturingEventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID tezDAGID = TezDAGID.getInstance("0", 0, 0);
        TezAMRMClientAsync tezAMRMClientAsync = (TezAMRMClientAsync) Mockito.spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(new TestTaskSchedulerHelpers.AMRMClientForTest(), 100));
        ((TaskSchedulerService.TaskSchedulerAppCallback) Mockito.doReturn(new TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "success", "url")).when(taskSchedulerAppCallback)).getFinalAppStatus();
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener) Mockito.mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
        AMNodeMap aMNodeMap = new AMNodeMap(capturingEventHandler, appContext);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(appContext)).getAllContainers();
        ((AppContext) Mockito.doReturn(aMNodeMap).when(appContext)).getAllNodes();
        ((AppContext) Mockito.doReturn(DAGAppMasterState.RUNNING).when(appContext)).getAMState();
        ((AppContext) Mockito.doReturn(tezDAGID).when(appContext)).getCurrentDAGID();
        ((AppContext) Mockito.doReturn(Mockito.mock(ClusterInfo.class)).when(appContext)).getClusterInfo();
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler) Mockito.spy(new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, capturingEventHandler, tezAMRMClientAsync, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher()));
        taskSchedulerEventHandler.init(configuration);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback spyTaskScheduler = ((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = spyTaskScheduler.getDrainableAppCallback();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        spyTaskScheduler.delayedContainerManager.drainedDelayedContainersForTest = atomicBoolean;
        Resource newInstance = Resource.newInstance(1024, 1);
        String[] strArr = {"host1"};
        String[] strArr2 = {"host2"};
        String[] strArr3 = {"/default-rack"};
        Priority newInstance2 = Priority.newInstance(1);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 1);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 1), 1);
        TaskAttempt taskAttempt = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent = createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, newInstance, strArr, strArr3, newInstance2);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 2), 1);
        TaskAttempt taskAttempt2 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent2 = createLaunchRequestEvent(tezTaskAttemptID2, taskAttempt2, newInstance, strArr, strArr3, newInstance2);
        TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 3), 1);
        TaskAttempt taskAttempt3 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent3 = createLaunchRequestEvent(tezTaskAttemptID3, taskAttempt3, newInstance, strArr2, strArr3, newInstance2);
        TezTaskAttemptID tezTaskAttemptID4 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 4), 1);
        TaskAttempt taskAttempt4 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent4 = createLaunchRequestEvent(tezTaskAttemptID4, taskAttempt4, newInstance, strArr2, strArr3, newInstance2);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent2);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent3);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent4);
        Container createContainer = createContainer(1, "host1", newInstance, newInstance2);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Collections.singletonList(createContainer));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt2), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        capturingEventHandler.reset();
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt2, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt2), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt3), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        capturingEventHandler.reset();
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt3, createContainer.getId(), TaskAttemptState.FAILED));
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler, Mockito.times(0))).taskAllocated(Matchers.eq(taskAttempt4), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt3), Matchers.eq(false));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync)).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        capturingEventHandler.reset();
        Container createContainer2 = createContainer(2, "host2", newInstance, newInstance2);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Collections.singletonList(createContainer2));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt4), Matchers.any(Object.class), (Container) Matchers.eq(createContainer2));
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt4, createContainer2.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt4), Matchers.eq(true));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync)).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer2.getId()));
        capturingEventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        capturingEventHandler.reset();
        spyTaskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout = 10000)
    public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, InterruptedException, ExecutionException {
        Configuration configuration = new Configuration(new YarnConfiguration());
        configuration.setBoolean("tez.am.container.reuse.enabled", true);
        configuration.setBoolean("tez.am.container.reuse.rack-fallback.enabled", true);
        configuration.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 0L);
        configuration.setLong("tez.am.container.idle.release-timeout-min.millis", 0L);
        configuration.set("tez.task-specific.launch.cmd-opts.list", "v1[1,3,4]");
        configuration.set("tez.task-specific.launch.cmd-opts", "dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__");
        TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(configuration);
        RackResolver.init(configuration);
        TaskSchedulerService.TaskSchedulerAppCallback taskSchedulerAppCallback = (TaskSchedulerService.TaskSchedulerAppCallback) Mockito.mock(TaskSchedulerService.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler capturingEventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID tezDAGID = TezDAGID.getInstance("0", 0, 0);
        TezAMRMClientAsync tezAMRMClientAsync = (TezAMRMClientAsync) Mockito.spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(new TestTaskSchedulerHelpers.AMRMClientForTest(), 100));
        ((TaskSchedulerService.TaskSchedulerAppCallback) Mockito.doReturn(new TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "success", "url")).when(taskSchedulerAppCallback)).getFinalAppStatus();
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener) Mockito.mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
        AMNodeMap aMNodeMap = new AMNodeMap(capturingEventHandler, appContext);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(appContext)).getAllContainers();
        ((AppContext) Mockito.doReturn(aMNodeMap).when(appContext)).getAllNodes();
        ((AppContext) Mockito.doReturn(DAGAppMasterState.RUNNING).when(appContext)).getAMState();
        ((AppContext) Mockito.doReturn(tezDAGID).when(appContext)).getCurrentDAGID();
        ((AppContext) Mockito.doReturn(Mockito.mock(ClusterInfo.class)).when(appContext)).getClusterInfo();
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler) Mockito.spy(new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, capturingEventHandler, tezAMRMClientAsync, new ContainerContextMatcher()));
        taskSchedulerEventHandler.init(configuration);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback spyTaskScheduler = ((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = spyTaskScheduler.getDrainableAppCallback();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        spyTaskScheduler.delayedContainerManager.drainedDelayedContainersForTest = atomicBoolean;
        Resource newInstance = Resource.newInstance(1024, 1);
        String[] strArr = {"host1"};
        String[] strArr2 = {"host2"};
        String[] strArr3 = {"host3"};
        String[] strArr4 = {"/default-rack"};
        Priority newInstance2 = Priority.newInstance(1);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 1);
        HashMap hashMap = new HashMap();
        String taskSpecificOption = taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 1);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 1), 1);
        TaskAttempt taskAttempt = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent = createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, newInstance, strArr, strArr4, newInstance2, hashMap, taskSpecificOption);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 2), 1);
        TaskAttempt taskAttempt2 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent2 = createLaunchRequestEvent(tezTaskAttemptID2, taskAttempt2, newInstance, strArr, strArr4, newInstance2);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent2);
        Container createContainer = createContainer(1, "host1", newInstance, newInstance2);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Collections.singletonList(createContainer));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler, Mockito.times(0))).taskAllocated(Matchers.eq(taskAttempt2), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(1))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        capturingEventHandler.reset();
        String taskSpecificOption2 = taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 3);
        TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 3), 1);
        TaskAttempt taskAttempt3 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent3 = createLaunchRequestEvent(tezTaskAttemptID3, taskAttempt3, newInstance, strArr2, strArr4, newInstance2, hashMap, taskSpecificOption2);
        String taskSpecificOption3 = taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 4);
        TezTaskAttemptID tezTaskAttemptID4 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 4), 1);
        TaskAttempt taskAttempt4 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent4 = createLaunchRequestEvent(tezTaskAttemptID4, taskAttempt4, newInstance, strArr2, strArr4, newInstance2, hashMap, taskSpecificOption3);
        Container createContainer2 = createContainer(2, "host2", newInstance, newInstance2);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent3);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent4);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Collections.singletonList(createContainer2));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt3), Matchers.any(Object.class), (Container) Matchers.eq(createContainer2));
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt3, createContainer2.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt3), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler, Mockito.times(0))).taskAllocated(Matchers.eq(taskAttempt4), Matchers.any(Object.class), (Container) Matchers.eq(createContainer2));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(1))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer2.getId()));
        capturingEventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        capturingEventHandler.reset();
        configuration.set("tez.task-specific.launch.cmd-opts.list", "v1[1,2,3,5,6]");
        configuration.set("tez.task-specific.launch.cmd-opts", "dummyOpts");
        TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption2 = new TaskSpecificLaunchCmdOption(configuration);
        TezTaskAttemptID tezTaskAttemptID5 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 3), 1);
        TaskAttempt taskAttempt5 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent5 = createLaunchRequestEvent(tezTaskAttemptID5, taskAttempt5, newInstance, strArr3, strArr4, newInstance2, hashMap, taskSpecificLaunchCmdOption2.getTaskSpecificOption("", "v1", 5));
        taskSpecificLaunchCmdOption2.getTaskSpecificOption("", "v1", 4);
        TezTaskAttemptID tezTaskAttemptID6 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 4), 1);
        TaskAttempt taskAttempt6 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent6 = createLaunchRequestEvent(tezTaskAttemptID6, taskAttempt6, newInstance, strArr3, strArr4, newInstance2, hashMap, taskSpecificLaunchCmdOption2.getTaskSpecificOption("", "v1", 6));
        Container createContainer3 = createContainer(2, "host3", newInstance, newInstance2);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent5);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent6);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Collections.singletonList(createContainer3));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt5), Matchers.any(Object.class), (Container) Matchers.eq(createContainer3));
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt5, createContainer3.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt5), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt6), Matchers.any(Object.class), (Container) Matchers.eq(createContainer3));
        capturingEventHandler.reset();
        spyTaskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout = 30000)
    public void testReuseNonLocalRequest() throws IOException, InterruptedException, ExecutionException {
        Configuration configuration = new Configuration(new YarnConfiguration());
        configuration.setBoolean("tez.am.container.reuse.enabled", true);
        configuration.setBoolean("tez.am.container.reuse.rack-fallback.enabled", true);
        configuration.setBoolean("tez.am.container.reuse.non-local-fallback.enabled", true);
        configuration.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 100L);
        configuration.setLong("tez.am.container.idle.release-timeout-min.millis", 1000L);
        configuration.setLong("tez.am.container.idle.release-timeout-max.millis", 1000L);
        RackResolver.init(configuration);
        TaskSchedulerService.TaskSchedulerAppCallback taskSchedulerAppCallback = (TaskSchedulerService.TaskSchedulerAppCallback) Mockito.mock(TaskSchedulerService.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler capturingEventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID tezDAGID = TezDAGID.getInstance("0", 0, 0);
        TezAMRMClientAsync tezAMRMClientAsync = (TezAMRMClientAsync) Mockito.spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(new TestTaskSchedulerHelpers.AMRMClientForTest(), 100));
        ((TaskSchedulerService.TaskSchedulerAppCallback) Mockito.doReturn(new TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "success", "url")).when(taskSchedulerAppCallback)).getFinalAppStatus();
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener) Mockito.mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
        AMNodeMap aMNodeMap = new AMNodeMap(capturingEventHandler, appContext);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(appContext)).getAllContainers();
        ((AppContext) Mockito.doReturn(aMNodeMap).when(appContext)).getAllNodes();
        ((AppContext) Mockito.doReturn(DAGAppMasterState.RUNNING).when(appContext)).getAMState();
        ((AppContext) Mockito.doReturn(tezDAGID).when(appContext)).getCurrentDAGID();
        ((AppContext) Mockito.doReturn(Mockito.mock(ClusterInfo.class)).when(appContext)).getClusterInfo();
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler) Mockito.spy(new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, capturingEventHandler, tezAMRMClientAsync, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher()));
        taskSchedulerEventHandler.init(configuration);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback spyTaskScheduler = ((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = spyTaskScheduler.getDrainableAppCallback();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        spyTaskScheduler.delayedContainerManager.drainedDelayedContainersForTest = atomicBoolean;
        Resource newInstance = Resource.newInstance(1024, 1);
        String[] strArr = new String[0];
        String[] strArr2 = {"default-rack"};
        Priority newInstance2 = Priority.newInstance(3);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 1);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 1), 1);
        TaskAttempt taskAttempt = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        ((TaskAttempt) Mockito.doReturn(tezVertexID).when(taskAttempt)).getVertexID();
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent = createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, newInstance, strArr, strArr2, newInstance2);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 2), 1);
        TaskAttempt taskAttempt2 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        ((TaskAttempt) Mockito.doReturn(tezVertexID).when(taskAttempt2)).getVertexID();
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent2 = createLaunchRequestEvent(tezTaskAttemptID2, taskAttempt2, newInstance, strArr, strArr2, newInstance2);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent);
        Container createContainer = createContainer(1, "randomHost", newInstance, newInstance2);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Collections.singletonList(createContainer));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent2);
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler, Mockito.times(0))).taskAllocated(Matchers.eq(taskAttempt2), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        capturingEventHandler.reset();
        LOG.info("Sleeping to ensure that the scheduling loop runs");
        Thread.sleep(3000L);
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt2), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt2, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        LOG.info("Sleeping to ensure that the scheduling loop runs");
        Thread.sleep(3000L);
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync)).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        spyTaskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout = 30000)
    public void testReuseAcrossVertices() throws IOException, InterruptedException, ExecutionException {
        Configuration configuration = new Configuration(new YarnConfiguration());
        configuration.setBoolean("tez.am.container.reuse.enabled", true);
        configuration.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 1L);
        configuration.setLong("tez.am.container.idle.release-timeout-min.millis", 2000L);
        configuration.setInt("tez.am.session.min.held-containers", 1);
        RackResolver.init(configuration);
        TaskSchedulerService.TaskSchedulerAppCallback taskSchedulerAppCallback = (TaskSchedulerService.TaskSchedulerAppCallback) Mockito.mock(TaskSchedulerService.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler capturingEventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID tezDAGID = TezDAGID.getInstance("0", 0, 0);
        TezAMRMClientAsync tezAMRMClientAsync = (TezAMRMClientAsync) Mockito.spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(new TestTaskSchedulerHelpers.AMRMClientForTest(), 100));
        ((TaskSchedulerService.TaskSchedulerAppCallback) Mockito.doReturn(new TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "success", "url")).when(taskSchedulerAppCallback)).getFinalAppStatus();
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener) Mockito.mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
        AMNodeMap aMNodeMap = new AMNodeMap(capturingEventHandler, appContext);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(appContext)).getAllContainers();
        ((AppContext) Mockito.doReturn(aMNodeMap).when(appContext)).getAllNodes();
        ((AppContext) Mockito.doReturn(DAGAppMasterState.RUNNING).when(appContext)).getAMState();
        ((AppContext) Mockito.doReturn(true).when(appContext)).isSession();
        ((AppContext) Mockito.doReturn(tezDAGID).when(appContext)).getCurrentDAGID();
        ((AppContext) Mockito.doReturn(Mockito.mock(ClusterInfo.class)).when(appContext)).getClusterInfo();
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler) Mockito.spy(new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, capturingEventHandler, tezAMRMClientAsync, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher()));
        taskSchedulerEventHandler.init(configuration);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback spyTaskScheduler = ((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = spyTaskScheduler.getDrainableAppCallback();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        spyTaskScheduler.delayedContainerManager.drainedDelayedContainersForTest = atomicBoolean;
        Resource newInstance = Resource.newInstance(1024, 1);
        String[] strArr = {"host1"};
        String[] strArr2 = {"/default-rack"};
        Priority newInstance2 = Priority.newInstance(3);
        Priority newInstance3 = Priority.newInstance(4);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 1);
        TezVertexID tezVertexID2 = TezVertexID.getInstance(tezDAGID, 2);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 1), 1);
        TaskAttempt taskAttempt = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        ((TaskAttempt) Mockito.doReturn(tezVertexID).when(taskAttempt)).getVertexID();
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent = createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, newInstance, strArr, strArr2, newInstance2);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID2, 1), 1);
        TaskAttempt taskAttempt2 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        ((TaskAttempt) Mockito.doReturn(tezVertexID2).when(taskAttempt2)).getVertexID();
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent2 = createLaunchRequestEvent(tezTaskAttemptID2, taskAttempt2, newInstance, strArr, strArr2, newInstance3);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent);
        Container createContainer = createContainer(1, strArr[0], newInstance, newInstance2);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Collections.singletonList(createContainer));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent2);
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt2), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt2, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        LOG.info("Sleeping to ensure that the scheduling loop runs");
        Thread.sleep(3000L);
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        spyTaskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout = 30000)
    public void testReuseLocalResourcesChanged() throws IOException, InterruptedException, ExecutionException {
        Configuration configuration = new Configuration(new YarnConfiguration());
        configuration.setBoolean("tez.am.container.reuse.enabled", true);
        configuration.setBoolean("tez.am.container.reuse.rack-fallback.enabled", true);
        configuration.setBoolean("tez.am.container.reuse.non-local-fallback.enabled", true);
        configuration.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 0L);
        configuration.setLong("tez.am.container.idle.release-timeout-min.millis", -1L);
        RackResolver.init(configuration);
        TaskSchedulerService.TaskSchedulerAppCallback taskSchedulerAppCallback = (TaskSchedulerService.TaskSchedulerAppCallback) Mockito.mock(TaskSchedulerService.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler capturingEventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID tezDAGID = TezDAGID.getInstance("0", 1, 0);
        TezAMRMClientAsync tezAMRMClientAsync = (TezAMRMClientAsync) Mockito.spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(new TestTaskSchedulerHelpers.AMRMClientForTest(), 100));
        ((TaskSchedulerService.TaskSchedulerAppCallback) Mockito.doReturn(new TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "success", "url")).when(taskSchedulerAppCallback)).getFinalAppStatus();
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        ChangingDAGIDAnswer changingDAGIDAnswer = new ChangingDAGIDAnswer(tezDAGID);
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener) Mockito.mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
        AMNodeMap aMNodeMap = new AMNodeMap(capturingEventHandler, appContext);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(appContext)).getAllContainers();
        ((AppContext) Mockito.doReturn(aMNodeMap).when(appContext)).getAllNodes();
        ((AppContext) Mockito.doReturn(DAGAppMasterState.RUNNING).when(appContext)).getAMState();
        ((AppContext) Mockito.doReturn(true).when(appContext)).isSession();
        ((AppContext) Mockito.doAnswer(changingDAGIDAnswer).when(appContext)).getCurrentDAGID();
        ((AppContext) Mockito.doReturn(Mockito.mock(ClusterInfo.class)).when(appContext)).getClusterInfo();
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler) Mockito.spy(new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, capturingEventHandler, tezAMRMClientAsync, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher()));
        taskSchedulerEventHandler.init(configuration);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback spyTaskScheduler = ((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = spyTaskScheduler.getDrainableAppCallback();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        spyTaskScheduler.delayedContainerManager.drainedDelayedContainersForTest = atomicBoolean;
        Resource newInstance = Resource.newInstance(1024, 1);
        String[] strArr = {"host1"};
        String[] strArr2 = {"/default-rack"};
        Priority newInstance2 = Priority.newInstance(1);
        LocalResource localResource = (LocalResource) Mockito.mock(LocalResource.class);
        LocalResource localResource2 = (LocalResource) Mockito.mock(LocalResource.class);
        LocalResource localResource3 = (LocalResource) Mockito.mock(LocalResource.class);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("rsrc1", localResource);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 1);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 1), 1);
        TaskAttempt taskAttempt = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent = createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, newInstance, strArr, strArr2, newInstance2, newHashMap);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 2), 1);
        TaskAttempt taskAttempt2 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent2 = createLaunchRequestEvent(tezTaskAttemptID2, taskAttempt2, newInstance, strArr, strArr2, newInstance2, newHashMap);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent2);
        Container createContainer = createContainer(1, "host1", newInstance, newInstance2);
        atomicBoolean.set(false);
        spyTaskScheduler.onContainersAllocated(Collections.singletonList(createContainer));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(atomicBoolean);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        Assert.assertEquals(1L, capturingEventHandler.verifyInvocation(AMContainerEventAssignTA.class).getRemoteTaskLocalResources().size());
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt2), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        Assert.assertEquals(1L, capturingEventHandler.verifyInvocation(AMContainerEventAssignTA.class).getRemoteTaskLocalResources().size());
        capturingEventHandler.reset();
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt2, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt2), Matchers.eq(true));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        capturingEventHandler.reset();
        TezDAGID tezDAGID2 = TezDAGID.getInstance("0", 2, 0);
        changingDAGIDAnswer.setDAGID(tezDAGID2);
        HashMap newHashMap2 = Maps.newHashMap();
        newHashMap2.put("rsrc2", localResource2);
        newHashMap2.put("rsrc3", localResource3);
        TezVertexID tezVertexID2 = TezVertexID.getInstance(tezDAGID2, 1);
        TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID2, 1), 1);
        TaskAttempt taskAttempt3 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent3 = createLaunchRequestEvent(tezTaskAttemptID3, taskAttempt3, newInstance, strArr, strArr2, newInstance2, newHashMap2);
        TezTaskAttemptID tezTaskAttemptID4 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID2, 2), 1);
        TaskAttempt taskAttempt4 = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest createLaunchRequestEvent4 = createLaunchRequestEvent(tezTaskAttemptID4, taskAttempt4, newInstance, strArr, strArr2, newInstance2, newHashMap2);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent3);
        taskSchedulerEventHandler.handleEvent(createLaunchRequestEvent4);
        drainableAppCallback.drain();
        LOG.info("Sleeping to ensure that the scheduling loop runs");
        Thread.sleep(6000L);
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt3), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        Assert.assertEquals(2L, capturingEventHandler.verifyInvocation(AMContainerEventAssignTA.class).getRemoteTaskLocalResources().size());
        capturingEventHandler.reset();
        taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(taskAttempt3, createContainer.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback) Mockito.verify(spyTaskScheduler)).deallocateTask(Matchers.eq(taskAttempt3), Matchers.eq(true));
        ((TaskSchedulerEventHandler) Mockito.verify(taskSchedulerEventHandler)).taskAllocated(Matchers.eq(taskAttempt4), Matchers.any(Object.class), (Container) Matchers.eq(createContainer));
        ((TezAMRMClientAsync) Mockito.verify(tezAMRMClientAsync, Mockito.times(0))).releaseAssignedContainer((ContainerId) Matchers.eq(createContainer.getId()));
        capturingEventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        Assert.assertEquals(2L, capturingEventHandler.verifyInvocation(AMContainerEventAssignTA.class).getRemoteTaskLocalResources().size());
        capturingEventHandler.reset();
        spyTaskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    private Container createContainer(int i, String str, Resource resource, Priority priority) {
        return Container.newInstance(ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 1), 1), i), NodeId.newInstance(str, 0), str + ":0", resource, priority, (Token) null);
    }

    private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID tezTaskAttemptID, TaskAttempt taskAttempt, Resource resource, String[] strArr, String[] strArr2, Priority priority, ContainerContext containerContext) {
        TaskLocationHint taskLocationHint = null;
        if (strArr != null || strArr2 != null) {
            taskLocationHint = TaskLocationHint.createTaskLocationHint(Sets.newHashSet(strArr), Sets.newHashSet(strArr2));
        }
        return new AMSchedulerEventTALaunchRequest(tezTaskAttemptID, resource, new TaskSpec(tezTaskAttemptID, "dagName", "vertexName", -1, ProcessorDescriptor.create("processorClassName"), Collections.singletonList(new InputSpec("vertexName", InputDescriptor.create("inputClassName"), 1)), Collections.singletonList(new OutputSpec("vertexName", OutputDescriptor.create("outputClassName"), 1)), (List) null), taskAttempt, taskLocationHint, priority, containerContext);
    }

    private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID tezTaskAttemptID, TaskAttempt taskAttempt, Resource resource, String[] strArr, String[] strArr2, Priority priority) {
        return createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, resource, strArr, strArr2, priority, new HashMap());
    }

    private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID tezTaskAttemptID, TaskAttempt taskAttempt, Resource resource, String[] strArr, String[] strArr2, Priority priority, Map<String, LocalResource> map) {
        return createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, resource, strArr, strArr2, priority, map, "");
    }

    private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID tezTaskAttemptID, TaskAttempt taskAttempt, Resource resource, String[] strArr, String[] strArr2, Priority priority, Map<String, LocalResource> map, String str) {
        return createLaunchRequestEvent(tezTaskAttemptID, taskAttempt, resource, strArr, strArr2, priority, new ContainerContext(map, new Credentials(), new HashMap(), str));
    }
}
