package org.apache.tez.dag.app;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/tez/dag/app/TestPreemption.class */
public class TestPreemption {
    static Configuration defaultConf;
    static FileSystem localFs;
    static Path workDir;
    MockDAGAppMaster mockApp;
    MockDAGAppMaster.MockContainerLauncher mockLauncher;
    int dagCount = 0;

    DAG createDAG(EdgeProperty.DataMovementType dataMovementType) {
        StringBuilder append = new StringBuilder().append("test-");
        int i = this.dagCount;
        this.dagCount = i + 1;
        DAG create = DAG.create(append.append(i).toString());
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 5);
        create.addVertex(create2).addVertex(create3).addEdge(Edge.create(create2, create3, EdgeProperty.create(dataMovementType, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("O.class"), InputDescriptor.create("I.class"))));
        return create;
    }

    @Test(timeout = 5000)
    public void testPreemptionWithoutSession() throws Exception {
        DAGImpl currentDAG;
        System.out.println("TestPreemptionWithoutSession");
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setInt("tez.am.task.max.failed.attempts", 0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockTezClient mockTezClient = new MockTezClient("testPreemption", tezConfiguration, false, null, null, atomicBoolean);
        mockTezClient.start();
        DAGClient submitDAG = mockTezClient.submitDAG(createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER));
        syncWithMockAppLauncher(false, atomicBoolean, mockTezClient);
        do {
            Thread.sleep(100L);
            currentDAG = this.mockApp.getContext().getCurrentDAG();
        } while (currentDAG == null);
        TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), 0);
        this.mockLauncher.preemptContainerForTask(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0).getTaskID(), 3);
        this.mockLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        for (int i = 0; i <= 3; i++) {
            Assert.assertEquals(TaskAttemptStateInternal.KILLED, currentDAG.getTaskAttempt(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), i)).getInternalState());
        }
        mockTezClient.stop();
    }

    @Test(timeout = 30000)
    public void testPreemptionWithSession() throws Exception {
        System.out.println("TestPreemptionWithSession");
        MockTezClient createTezSession = createTezSession();
        testPreemptionSingle(createTezSession, createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
        testPreemptionMultiple(createTezSession, createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
        testPreemptionSingle(createTezSession, createDAG(EdgeProperty.DataMovementType.BROADCAST), 0, "Broadcast");
        testPreemptionMultiple(createTezSession, createDAG(EdgeProperty.DataMovementType.BROADCAST), 0, "Broadcast");
        testPreemptionSingle(createTezSession, createDAG(EdgeProperty.DataMovementType.ONE_TO_ONE), 0, "1-1");
        testPreemptionMultiple(createTezSession, createDAG(EdgeProperty.DataMovementType.ONE_TO_ONE), 0, "1-1");
        testPreemptionSingle(createTezSession, createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
        testPreemptionMultiple(createTezSession, createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
        testPreemptionSingle(createTezSession, createDAG(EdgeProperty.DataMovementType.BROADCAST), 1, "Broadcast");
        testPreemptionMultiple(createTezSession, createDAG(EdgeProperty.DataMovementType.BROADCAST), 1, "Broadcast");
        testPreemptionSingle(createTezSession, createDAG(EdgeProperty.DataMovementType.ONE_TO_ONE), 1, "1-1");
        testPreemptionMultiple(createTezSession, createDAG(EdgeProperty.DataMovementType.ONE_TO_ONE), 1, "1-1");
        createTezSession.stop();
    }

    MockTezClient createTezSession() throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setInt("tez.am.task.max.failed.attempts", 0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockTezClient mockTezClient = new MockTezClient("testPreemption", tezConfiguration, true, null, null, atomicBoolean);
        mockTezClient.start();
        syncWithMockAppLauncher(false, atomicBoolean, mockTezClient);
        return mockTezClient;
    }

    void syncWithMockAppLauncher(boolean z, AtomicBoolean atomicBoolean, MockTezClient mockTezClient) throws Exception {
        synchronized (atomicBoolean) {
            while (!atomicBoolean.get()) {
                atomicBoolean.wait();
            }
            this.mockApp = mockTezClient.getLocalClient().getMockApp();
            this.mockLauncher = this.mockApp.m0getContainerLauncher();
            this.mockLauncher.startScheduling(z);
            atomicBoolean.notify();
        }
    }

    void testPreemptionSingle(MockTezClient mockTezClient, DAG dag, int i, String str) throws Exception {
        testPreemptionJob(mockTezClient, dag, i, 0, str + "-Single");
    }

    void testPreemptionMultiple(MockTezClient mockTezClient, DAG dag, int i, String str) throws Exception {
        testPreemptionJob(mockTezClient, dag, i, 3, str + "-Multiple");
    }

    void testPreemptionJob(MockTezClient mockTezClient, DAG dag, int i, int i2, String str) throws Exception {
        System.out.println("TestPreemption - Running - " + str);
        new TezConfiguration(defaultConf).setInt("tez.am.task.max.failed.attempts", 0);
        this.mockLauncher.startScheduling(false);
        DAGClient submitDAG = mockTezClient.submitDAG(dag);
        DAGImpl currentDAG = this.mockApp.getContext().getCurrentDAG();
        TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), i);
        this.mockLauncher.preemptContainerForTask(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0).getTaskID(), i2);
        this.mockLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        for (int i3 = 0; i3 <= i2; i3++) {
            Assert.assertEquals(TaskAttemptStateInternal.KILLED, currentDAG.getTaskAttempt(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), i3)).getInternalState());
        }
        System.out.println("TestPreemption - Done running - " + str);
    }

    static {
        try {
            defaultConf = new Configuration(false);
            defaultConf.set("fs.defaultFS", "file:///");
            defaultConf.setBoolean("tez.local.mode", true);
            localFs = FileSystem.getLocal(defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestDAGAppMaster").makeQualified(localFs);
        } catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}
