package org.apache.tez.dag.app;

import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/tez/dag/app/TestSpeculation.class */
public class TestSpeculation {
    static Configuration defaultConf;
    static FileSystem localFs;
    MockDAGAppMaster mockApp;
    MockDAGAppMaster.MockContainerLauncher mockLauncher;

    MockTezClient createTezSession() throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MockTezClient mockTezClient = new MockTezClient("testspeculation", tezConfiguration, true, null, null, new MockClock(), atomicBoolean, false, false, 1, 2);
        mockTezClient.start();
        syncWithMockAppLauncher(false, atomicBoolean, mockTezClient);
        return mockTezClient;
    }

    void syncWithMockAppLauncher(boolean z, AtomicBoolean atomicBoolean, MockTezClient mockTezClient) throws Exception {
        synchronized (atomicBoolean) {
            while (!atomicBoolean.get()) {
                atomicBoolean.wait();
            }
            this.mockApp = mockTezClient.getLocalClient().getMockApp();
            this.mockLauncher = this.mockApp.getContainerLauncher();
            this.mockLauncher.startScheduling(z);
            atomicBoolean.notify();
        }
    }

    public void testBasicSpeculation(boolean z) throws Exception {
        DAG create = DAG.create("test");
        create.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5));
        MockTezClient createTezSession = createTezSession();
        DAGClient submitDAG = createTezSession.submitDAG(create);
        DAGImpl currentDAG = this.mockApp.getContext().getCurrentDAG();
        TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), 0);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 1);
        this.mockLauncher.updateProgress(z);
        this.mockLauncher.setStatusUpdatesForTask(tezTaskAttemptID, 100);
        this.mockLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        TaskImpl task = currentDAG.getTask(tezTaskAttemptID.getTaskID());
        Assert.assertEquals(2L, task.getAttempts().size());
        Assert.assertEquals(tezTaskAttemptID2, task.getSuccessfulAttempt().getID());
        TaskAttempt attempt = task.getAttempt(tezTaskAttemptID);
        Joiner.on(",").join(attempt.getDiagnostics()).contains("Killed as speculative attempt");
        Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, attempt.getTerminationCause());
        if (z) {
            Assert.assertEquals(1L, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
            Assert.assertEquals(1L, currentDAG.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
            Assert.assertEquals(1L, currentDAG.getVertex(tezTaskAttemptID.getTaskID().getVertexID()).getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
        }
        createTezSession.stop();
    }

    @Test(timeout = 10000)
    public void testBasicSpeculationWithProgress() throws Exception {
        testBasicSpeculation(true);
    }

    @Test(timeout = 10000)
    public void testBasicSpeculationWithoutProgress() throws Exception {
        testBasicSpeculation(false);
    }

    @Test(timeout = 10000)
    public void testBasicSpeculationNotUseful() throws Exception {
        DAG create = DAG.create("test");
        create.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5));
        MockTezClient createTezSession = createTezSession();
        DAGClient submitDAG = createTezSession.submitDAG(create);
        DAGImpl currentDAG = this.mockApp.getContext().getCurrentDAG();
        TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), 0);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 1);
        this.mockLauncher.setStatusUpdatesForTask(tezTaskAttemptID, 100);
        this.mockLauncher.setStatusUpdatesForTask(tezTaskAttemptID2, 100);
        this.mockLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        TaskImpl task = currentDAG.getTask(tezTaskAttemptID2.getTaskID());
        Assert.assertEquals(2L, task.getAttempts().size());
        Assert.assertEquals(tezTaskAttemptID, task.getSuccessfulAttempt().getID());
        TaskAttempt attempt = task.getAttempt(tezTaskAttemptID2);
        Joiner.on(",").join(attempt.getDiagnostics()).contains("Killed speculative attempt as");
        Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION, attempt.getTerminationCause());
        Assert.assertEquals(1L, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
        Assert.assertEquals(1L, currentDAG.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
        Assert.assertEquals(1L, currentDAG.getVertex(tezTaskAttemptID2.getTaskID().getVertexID()).getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS).getValue());
        createTezSession.stop();
    }

    static {
        try {
            defaultConf = new Configuration(false);
            defaultConf.set("fs.defaultFS", "file:///");
            defaultConf.setBoolean("tez.local.mode", true);
            defaultConf.setBoolean("tez.am.speculation.enabled", true);
            localFs = FileSystem.getLocal(defaultConf);
            defaultConf.set("tez.staging-dir", "target/" + TestSpeculation.class.getName() + "-tmpDir");
        } catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}
