package org.apache.tez.dag.app.dag.impl;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit.class */
public class TestCommit {
    private TezDAGID dagId;
    private DrainDispatcher dispatcher;
    private Credentials fsTokens;
    private AppContext appContext;
    private ACLManager aclManager;
    private ApplicationAttemptId appAttemptId;
    private DAGImpl dag;
    private TaskEventDispatcher taskEventDispatcher;
    private VertexEventDispatcher vertexEventDispatcher;
    private DagEventDispatcher dagEventDispatcher;
    private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
    private TaskHeartbeatHandler thh;
    private Clock clock = new SystemClock();
    private DAGFinishEventHandler dagFinishEventHandler;
    private MockHistoryEventHandler historyEventHandler;
    private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
    private ExecutorService rawExecutor;
    private ListeningExecutorService execService;
    private static final Log LOG = LogFactory.getLog(TestCommit.class);
    private static Configuration conf = new Configuration();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$ControlledThreadPoolExecutor.class */
    public static class ControlledThreadPoolExecutor extends ThreadPoolExecutor {
        public boolean startFlag;

        public ControlledThreadPoolExecutor(int i) {
            this(i, i, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        }

        public ControlledThreadPoolExecutor(int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue) {
            super(i, i2, j, timeUnit, blockingQueue);
            this.startFlag = false;
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void beforeExecute(Thread thread, Runnable runnable) {
            while (!this.startFlag) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            super.beforeExecute(thread, runnable);
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$CountingOutputCommitter.class */
    public static class CountingOutputCommitter extends OutputCommitter {
        public volatile int initCounter;
        public volatile int setupCounter;
        public volatile int commitCounter;
        public volatile int abortCounter;
        private boolean throwError;
        private volatile boolean blockCommit;

        /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$CountingOutputCommitter$CountingOutputCommitterConfig.class */
        public static class CountingOutputCommitterConfig implements Writable {
            boolean throwError;
            boolean blockCommit;

            public CountingOutputCommitterConfig() {
                this.throwError = false;
                this.blockCommit = false;
            }

            public CountingOutputCommitterConfig(boolean z, boolean z2) {
                this.throwError = false;
                this.blockCommit = false;
                this.throwError = z;
                this.blockCommit = z2;
            }

            public CountingOutputCommitterConfig(UserPayload userPayload) throws IOException {
                this.throwError = false;
                this.blockCommit = false;
                DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
                dataInputByteBuffer.reset(new ByteBuffer[]{userPayload.getPayload()});
                readFields(dataInputByteBuffer);
            }

            public void write(DataOutput dataOutput) throws IOException {
                dataOutput.writeBoolean(this.throwError);
                dataOutput.writeBoolean(this.blockCommit);
            }

            public void readFields(DataInput dataInput) throws IOException {
                this.throwError = dataInput.readBoolean();
                this.blockCommit = dataInput.readBoolean();
            }

            public byte[] toUserPayload() throws IOException {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                write(new DataOutputStream(byteArrayOutputStream));
                return byteArrayOutputStream.toByteArray();
            }
        }

        public CountingOutputCommitter(OutputCommitterContext outputCommitterContext) {
            super(outputCommitterContext);
            this.initCounter = 0;
            this.setupCounter = 0;
            this.commitCounter = 0;
            this.abortCounter = 0;
            this.throwError = false;
        }

        public void initialize() throws IOException {
            if (getContext().getUserPayload() != null && getContext().getUserPayload().hasPayload()) {
                CountingOutputCommitterConfig countingOutputCommitterConfig = new CountingOutputCommitterConfig(getContext().getUserPayload());
                this.throwError = countingOutputCommitterConfig.throwError;
                this.blockCommit = countingOutputCommitterConfig.blockCommit;
            }
            this.initCounter++;
        }

        public void setupOutput() throws IOException {
            this.setupCounter++;
        }

        public void commitOutput() throws IOException {
            this.commitCounter++;
            while (this.blockCommit) {
                try {
                    Thread.sleep(100L);
                    TestCommit.LOG.info("committing output:" + getContext().getOutputName());
                } catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }
            if (this.throwError) {
                throw new RuntimeException("I can throwz exceptions in commit");
            }
        }

        public void unblockCommit() {
            this.blockCommit = false;
        }

        public void abortOutput(VertexStatus.State state) throws IOException {
            this.abortCounter++;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$DAGFinishEventHandler.class */
    public class DAGFinishEventHandler implements EventHandler<DAGAppMasterEventDAGFinished> {
        private DAGFinishEventHandler() {
        }

        public void handle(DAGAppMasterEventDAGFinished dAGAppMasterEventDAGFinished) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$DagEventDispatcher.class */
    public class DagEventDispatcher implements EventHandler<DAGEvent> {
        private DagEventDispatcher() {
        }

        public void handle(DAGEvent dAGEvent) {
            TestCommit.this.dag.handle(dAGEvent);
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$FailOnVMEventReceivedlVertexManager.class */
    public static class FailOnVMEventReceivedlVertexManager extends ImmediateStartVertexManager {
        public FailOnVMEventReceivedlVertexManager(VertexManagerPluginContext vertexManagerPluginContext) {
            super(vertexManagerPluginContext);
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
            super.onVertexManagerEventReceived(vertexManagerEvent);
            throw new RuntimeException("fail vm");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$MockHistoryEventHandler.class */
    public static class MockHistoryEventHandler extends HistoryEventHandler {
        public boolean failVertexGroupCommitFinishedEvent;
        public boolean failDAGCommitStartedEvent;
        public Queue<HistoryEvent> historyEvents;

        public MockHistoryEventHandler(AppContext appContext) {
            super(appContext);
            this.failVertexGroupCommitFinishedEvent = false;
            this.failDAGCommitStartedEvent = false;
            this.historyEvents = new ConcurrentLinkedQueue();
        }

        public void handleCriticalEvent(DAGHistoryEvent dAGHistoryEvent) throws IOException {
            if (dAGHistoryEvent.getHistoryEvent().getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED && this.failVertexGroupCommitFinishedEvent) {
                throw new IOException("fail VertexGroupCommitFinishedEvent");
            }
            if (dAGHistoryEvent.getHistoryEvent().getEventType() == HistoryEventType.DAG_COMMIT_STARTED && this.failDAGCommitStartedEvent) {
                throw new IOException("fail DAGCommitStartedEvent");
            }
            this.historyEvents.add(dAGHistoryEvent.getHistoryEvent());
        }

        public void verifyVertexGroupCommitStartedEvent(String str, int i) {
            int i2 = 0;
            Iterator<HistoryEvent> it = this.historyEvents.iterator();
            while (it.hasNext()) {
                VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = (HistoryEvent) it.next();
                if (vertexGroupCommitStartedEvent.getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_STARTED && vertexGroupCommitStartedEvent.getVertexGroupName().equals(str)) {
                    i2++;
                }
            }
            Assert.assertEquals(i, i2);
        }

        public void verifyVertexGroupCommitFinishedEvent(String str, int i) {
            int i2 = 0;
            Iterator<HistoryEvent> it = this.historyEvents.iterator();
            while (it.hasNext()) {
                VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = (HistoryEvent) it.next();
                if (vertexGroupCommitFinishedEvent.getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED && vertexGroupCommitFinishedEvent.getVertexGroupName().equals(str)) {
                    i2++;
                }
            }
            Assert.assertEquals(i, i2);
        }

        public void verifyVertexCommitStartedEvent(TezVertexID tezVertexID, int i) {
            int i2 = 0;
            Iterator<HistoryEvent> it = this.historyEvents.iterator();
            while (it.hasNext()) {
                VertexCommitStartedEvent vertexCommitStartedEvent = (HistoryEvent) it.next();
                if (vertexCommitStartedEvent.getEventType() == HistoryEventType.VERTEX_COMMIT_STARTED && vertexCommitStartedEvent.getVertexID().equals(tezVertexID)) {
                    i2++;
                }
            }
            Assert.assertEquals(i, i2);
        }

        public void verifyVertexFinishedEvent(TezVertexID tezVertexID, int i) {
            int i2 = 0;
            Iterator<HistoryEvent> it = this.historyEvents.iterator();
            while (it.hasNext()) {
                VertexFinishedEvent vertexFinishedEvent = (HistoryEvent) it.next();
                if (vertexFinishedEvent.getEventType() == HistoryEventType.VERTEX_FINISHED && vertexFinishedEvent.getVertexID().equals(tezVertexID)) {
                    i2++;
                }
            }
            Assert.assertEquals(i, i2);
        }

        public void verifyDAGCommitStartedEvent(TezDAGID tezDAGID, int i) {
            int i2 = 0;
            Iterator<HistoryEvent> it = this.historyEvents.iterator();
            while (it.hasNext()) {
                DAGCommitStartedEvent dAGCommitStartedEvent = (HistoryEvent) it.next();
                if (dAGCommitStartedEvent.getEventType() == HistoryEventType.DAG_COMMIT_STARTED && dAGCommitStartedEvent.getDagID().equals(tezDAGID)) {
                    i2++;
                }
            }
            Assert.assertEquals(i, i2);
        }

        public void verifyDAGFinishedEvent(TezDAGID tezDAGID, int i) {
            int i2 = 0;
            Iterator<HistoryEvent> it = this.historyEvents.iterator();
            while (it.hasNext()) {
                DAGFinishedEvent dAGFinishedEvent = (HistoryEvent) it.next();
                if (dAGFinishedEvent.getEventType() == HistoryEventType.DAG_FINISHED && dAGFinishedEvent.getDagID().equals(tezDAGID)) {
                    i2++;
                }
            }
            Assert.assertEquals(i, i2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$TaskAttemptEventDispatcher.class */
    public class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDispatcher() {
        }

        public void handle(TaskAttemptEvent taskAttemptEvent) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$TaskEventDispatcher.class */
    public class TaskEventDispatcher implements EventHandler<TaskEvent> {
        private TaskEventDispatcher() {
        }

        public void handle(TaskEvent taskEvent) {
            TestCommit.this.dag.getVertex(taskEvent.getTaskID().getVertexID()).getTask(taskEvent.getTaskID()).handle(taskEvent);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestCommit$VertexEventDispatcher.class */
    public class VertexEventDispatcher implements EventHandler<VertexEvent> {
        private VertexEventDispatcher() {
        }

        public void handle(VertexEvent vertexEvent) {
            TestCommit.this.dag.getVertex(vertexEvent.getVertexId()).handle(vertexEvent);
        }
    }

    public void setupDAG(DAGProtos.DAGPlan dAGPlan) {
        conf.setBoolean("tez.am.container.reuse.enabled", false);
        this.appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(100L, 1), 1);
        this.dagId = TezDAGID.getInstance(this.appAttemptId.getApplicationId(), 1);
        Assert.assertNotNull(this.dagId);
        this.dispatcher = new DrainDispatcher();
        this.fsTokens = new Credentials();
        this.appContext = (AppContext) Mockito.mock(AppContext.class);
        Mockito.when(this.appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
        this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("App Shared Pool - #%d").build());
        this.execService = MoreExecutors.listeningDecorator(this.rawExecutor);
        ((AppContext) Mockito.doReturn(this.execService).when(this.appContext)).getExecService();
        this.historyEventHandler = new MockHistoryEventHandler(this.appContext);
        this.aclManager = new ACLManager("amUser");
        ((AppContext) Mockito.doReturn(conf).when(this.appContext)).getAMConf();
        ((AppContext) Mockito.doReturn(this.appAttemptId).when(this.appContext)).getApplicationAttemptId();
        ((AppContext) Mockito.doReturn(this.appAttemptId.getApplicationId()).when(this.appContext)).getApplicationID();
        ((AppContext) Mockito.doReturn(this.dagId).when(this.appContext)).getCurrentDAGID();
        ((AppContext) Mockito.doReturn(this.historyEventHandler).when(this.appContext)).getHistoryHandler();
        ((AppContext) Mockito.doReturn(this.aclManager).when(this.appContext)).getAMACLManager();
        this.dag = new DAGImpl(this.dagId, conf, dAGPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        ((AppContext) Mockito.doReturn(this.dag).when(this.appContext)).getCurrentDAG();
        ((AppContext) Mockito.doReturn(this.dispatcher.getEventHandler()).when(this.appContext)).getEventHandler();
        ((AppContext) Mockito.doReturn(new ClusterInfo(Resource.newInstance(8192, 10))).when(this.appContext)).getClusterInfo();
        this.dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
        this.taskEventDispatcher = new TaskEventDispatcher();
        this.dispatcher.register(TaskEventType.class, this.taskEventDispatcher);
        this.taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
        this.dispatcher.register(TaskAttemptEventType.class, this.taskAttemptEventDispatcher);
        this.vertexEventDispatcher = new VertexEventDispatcher();
        this.dispatcher.register(VertexEventType.class, this.vertexEventDispatcher);
        this.dagEventDispatcher = new DagEventDispatcher();
        this.dispatcher.register(DAGEventType.class, this.dagEventDispatcher);
        this.dagFinishEventHandler = new DAGFinishEventHandler();
        this.dispatcher.register(DAGAppMasterEventType.class, this.dagFinishEventHandler);
        this.dispatcher.init(conf);
        this.dispatcher.start();
    }

    @After
    public void teardown() {
        if (this.dispatcher != null) {
            this.dispatcher.await();
            this.dispatcher.stop();
        }
        if (this.execService != null) {
            this.execService.shutdownNow();
        }
    }

    private void waitUntil(DAGImpl dAGImpl, DAGState dAGState) {
        while (dAGImpl.getState() != dAGState) {
            LOG.info("Wait for dag go to state:" + dAGState);
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void waitUntil(VertexImpl vertexImpl, VertexState vertexState) {
        while (vertexImpl.getState() != vertexState) {
            LOG.info("Wait for vertex " + vertexImpl.getLogIdentifier() + " go to state:" + vertexState);
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void waitForCommitCompleted(VertexImpl vertexImpl, String str) {
        while (vertexImpl.commitFutures.containsKey(str)) {
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LOG.info("Wait for vertex commit " + str + " to complete");
        }
    }

    private void waitForCommitCompleted(DAGImpl dAGImpl, DAGImpl.OutputKey outputKey) {
        while (dAGImpl.commitFutures.containsKey(outputKey)) {
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LOG.info("Wait for dag commit " + outputKey + " to complete");
        }
    }

    private DAGProtos.DAGPlan createDAGPlan(boolean z, boolean z2) throws Exception {
        LOG.info("Setting up group dag plan");
        Resource newInstance = Resource.newInstance(1, 1);
        Vertex create = Vertex.create("vertex1", ProcessorDescriptor.create("Processor"), 1, newInstance);
        Vertex create2 = Vertex.create("vertex2", ProcessorDescriptor.create("Processor"), 1, newInstance);
        Vertex create3 = Vertex.create("vertex3", ProcessorDescriptor.create("Processor"), 1, newInstance);
        DAG create4 = DAG.create("testDag");
        OutputCommitterDescriptor userPayload = OutputCommitterDescriptor.create(CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!z, true).toUserPayload())));
        OutputCommitterDescriptor userPayload2 = OutputCommitterDescriptor.create(CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!z2, true).toUserPayload())));
        VertexGroup createVertexGroup = create4.createVertexGroup("uv12", new Vertex[]{create, create2});
        OutputDescriptor create5 = OutputDescriptor.create("output.class");
        createVertexGroup.addDataSink("v12Out", DataSinkDescriptor.create(create5, userPayload, (Credentials) null));
        create3.addDataSink("v3Out", DataSinkDescriptor.create(create5, userPayload2, (Credentials) null));
        GroupInputEdge create6 = GroupInputEdge.create(createVertexGroup, create3, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class")), InputDescriptor.create("merge.class"));
        create4.addVertex(create);
        create4.addVertex(create2);
        create4.addVertex(create3);
        create4.addEdge(create6);
        return create4.createDag(conf, (Credentials) null, (Map) null, (LocalResource) null, true);
    }

    private DAGProtos.DAGPlan createDAGPlanWith2VertexGroupOutputs(boolean z, boolean z2, boolean z3) throws Exception {
        LOG.info("Setting up group dag plan");
        Resource newInstance = Resource.newInstance(1, 1);
        Vertex create = Vertex.create("vertex1", ProcessorDescriptor.create("Processor"), 1, newInstance);
        Vertex create2 = Vertex.create("vertex2", ProcessorDescriptor.create("Processor"), 1, newInstance);
        Vertex create3 = Vertex.create("vertex3", ProcessorDescriptor.create("Processor"), 1, newInstance);
        DAG create4 = DAG.create("testDag");
        OutputCommitterDescriptor userPayload = OutputCommitterDescriptor.create(CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!z, true).toUserPayload())));
        OutputCommitterDescriptor userPayload2 = OutputCommitterDescriptor.create(CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!z2, true).toUserPayload())));
        OutputCommitterDescriptor userPayload3 = OutputCommitterDescriptor.create(CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!z3, true).toUserPayload())));
        VertexGroup createVertexGroup = create4.createVertexGroup("uv12", new Vertex[]{create, create2});
        OutputDescriptor create5 = OutputDescriptor.create("output.class");
        createVertexGroup.addDataSink("v12Out1", DataSinkDescriptor.create(create5, userPayload, (Credentials) null));
        createVertexGroup.addDataSink("v12Out2", DataSinkDescriptor.create(create5, userPayload2, (Credentials) null));
        create3.addDataSink("v3Out", DataSinkDescriptor.create(create5, userPayload3, (Credentials) null));
        GroupInputEdge create6 = GroupInputEdge.create(createVertexGroup, create3, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class")), InputDescriptor.create("merge.class"));
        create4.addVertex(create);
        create4.addVertex(create2);
        create4.addVertex(create3);
        create4.addEdge(create6);
        return create4.createDag(conf, (Credentials) null, (Map) null, (LocalResource) null, true);
    }

    private DAGProtos.DAGPlan createDAGPlan_SingleVertexWith2Committer(boolean z, boolean z2) throws IOException {
        return createDAGPlan_SingleVertexWith2Committer(z, z2, false);
    }

    private DAGProtos.DAGPlan createDAGPlan_SingleVertexWith2Committer(boolean z, boolean z2, boolean z3) throws IOException {
        LOG.info("Setting up group dag plan");
        Vertex create = Vertex.create("vertex1", ProcessorDescriptor.create("Processor"), 1, Resource.newInstance(1, 1));
        if (z3) {
            create.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(FailOnVMEventReceivedlVertexManager.class.getName()));
        }
        OutputCommitterDescriptor userPayload = OutputCommitterDescriptor.create(CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!z, true).toUserPayload())));
        OutputCommitterDescriptor userPayload2 = OutputCommitterDescriptor.create(CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!z2, true).toUserPayload())));
        DAG create2 = DAG.create("testDag");
        create2.addVertex(create);
        OutputDescriptor create3 = OutputDescriptor.create("output.class");
        create.addDataSink("v1Out_1", DataSinkDescriptor.create(create3, userPayload, (Credentials) null));
        create.addDataSink("v1Out_2", DataSinkDescriptor.create(create3, userPayload2, (Credentials) null));
        return create2.createDag(conf, (Credentials) null, (Map) null, (LocalResource) null, true);
    }

    private void initDAG(DAGImpl dAGImpl) {
        dAGImpl.handle(new DAGEvent(dAGImpl.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals(DAGState.INITED, dAGImpl.getState());
    }

    private void startDAG(DAGImpl dAGImpl) {
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(dAGImpl.getID(), (List) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, dAGImpl.getState());
    }

    @Test(timeout = 5000)
    public void testVertexCommit_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertNull(vertex.getTerminationCause());
        Assert.assertTrue(vertex.commitFutures.isEmpty());
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_1");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_2");
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(0L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(0L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(0L, countingOutputCommitter2.commitCounter);
        Assert.assertEquals(0L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testVertexCommit_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        vertexImpl.handle(new VertexEventTaskCompleted(vertexImpl.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.COMMITTING, vertexImpl.getState());
        ((CountingOutputCommitter) vertexImpl.getOutputCommitter("v1Out_1")).unblockCommit();
        waitForCommitCompleted(vertexImpl, "v1Out_1");
        Assert.assertEquals(VertexState.COMMITTING, vertexImpl.getState());
        ((CountingOutputCommitter) vertexImpl.getOutputCommitter("v1Out_2")).unblockCommit();
        waitUntil(vertexImpl, VertexState.SUCCEEDED);
        Assert.assertNull(vertexImpl.getTerminationCause());
        Assert.assertTrue(vertexImpl.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertexImpl.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertexImpl.getVertexId(), 1);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testVertexCommitFail1_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan_SingleVertexWith2Committer(false, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        vertexImpl.handle(new VertexEventTaskCompleted(vertexImpl.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.COMMITTING, vertexImpl.getState());
        ((CountingOutputCommitter) vertexImpl.getOutputCommitter("v1Out_1")).unblockCommit();
        waitUntil(vertexImpl, VertexState.FAILED);
        Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertTrue(vertexImpl.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertexImpl.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertexImpl.getVertexId(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertexImpl.getOutputCommitter("v1Out_2");
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
    }

    @Test(timeout = 5000)
    public void testVertexCommitFail2_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan_SingleVertexWith2Committer(true, false));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        vertexImpl.handle(new VertexEventTaskCompleted(vertexImpl.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.COMMITTING, vertexImpl.getState());
        ((CountingOutputCommitter) vertexImpl.getOutputCommitter("v1Out_1")).unblockCommit();
        waitForCommitCompleted(vertexImpl, "v1Out_1");
        Assert.assertEquals(VertexState.COMMITTING, vertexImpl.getState());
        ((CountingOutputCommitter) vertexImpl.getOutputCommitter("v1Out_2")).unblockCommit();
        waitUntil(vertexImpl, VertexState.FAILED);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertexImpl.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertexImpl.getVertexId(), 1);
        Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertTrue(vertexImpl.commitFutures.isEmpty());
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testVertexKilledWhileCommitting() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.COMMITTING, vertex.getState());
        this.dag.handle(new DAGEventTerminateDag(this.dag.getID(), DAGTerminationCause.DAG_KILL, (String) null));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.KILLED, vertex.getState());
        Assert.assertTrue(vertex.commitFutures.isEmpty());
        Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, vertex.getTerminationCause());
        Assert.assertEquals(DAGState.KILLED, this.dag.getState());
        Assert.assertEquals(DAGTerminationCause.DAG_KILL, this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_1");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_2");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testVertexRescheduleWhileCommitting() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.COMMITTING, vertex.getState());
        vertex.handle(new VertexEventTaskReschedule(TezTaskID.getInstance(vertex.getVertexId(), 2)));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertex.getState());
        Assert.assertEquals(VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING, vertex.getTerminationCause());
        Assert.assertTrue(vertex.commitFutures.isEmpty());
        Assert.assertEquals(DAGState.FAILED, this.dag.getState());
        Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_1");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_2");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testVertexRouteEventErrorWhileCommitting() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.COMMITTING, vertex.getState());
        vertex.handle(new VertexEventRouteEvent(vertex.getVertexId(), Collections.singletonList(new TezEvent(VertexManagerEvent.create("vertex1", ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", (String) null, TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertex.getVertexId(), 0), 0))))));
        waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals(VertexState.FAILED, vertex.getState());
        Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, vertex.getTerminationCause());
        Assert.assertTrue(vertex.commitFutures.isEmpty());
        Assert.assertEquals(DAGState.FAILED, this.dag.getState());
        Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_1");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_2");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testVertexInternalErrorWhileCommiting() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.COMMITTING, vertex.getState());
        vertex.handle(new VertexEvent(vertex.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.ERROR, vertex.getState());
        Assert.assertEquals(VertexTerminationCause.INTERNAL_ERROR, vertex.getTerminationCause());
        Assert.assertEquals(DAGState.ERROR, this.dag.getState());
        Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_1");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex.getOutputCommitter("v1Out_2");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitSucceeded_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        waitUntil(this.dag, DAGState.COMMITTING);
        ((CountingOutputCommitter) vertex.getOutputCommitter("v12Out")).unblockCommit();
        waitUntil(this.dag, DAGState.COMMITTING);
        ((CountingOutputCommitter) vertex3.getOutputCommitter("v3Out")).unblockCommit();
        waitUntil(this.dag, DAGState.SUCCEEDED);
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        Assert.assertNull(this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v3", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v3", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitFail1_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan(true, false));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        waitUntil(this.dag, DAGState.COMMITTING);
        ((CountingOutputCommitter) vertex.getOutputCommitter("v12Out")).unblockCommit();
        waitForCommitCompleted(this.dag, new DAGImpl.OutputKey("v12Out", "uv12", true));
        Assert.assertEquals(DAGState.COMMITTING, this.dag.getState());
        ((CountingOutputCommitter) vertex3.getOutputCommitter("v3Out")).unblockCommit();
        waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex3.getState());
        Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitFail2_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan(false, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        waitUntil(this.dag, DAGState.COMMITTING);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex3.getOutputCommitter("v3Out");
        countingOutputCommitter.unblockCommit();
        waitUntil(this.dag, DAGState.FAILED);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex3.getState());
        Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitSucceeded1_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertexImpl.handle(new VertexEventTaskCompleted(vertexImpl.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.COMMITTING, vertexImpl.getState());
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertexImpl.getOutputCommitter("v3Out")).unblockCommit();
        waitUntil(vertexImpl, VertexState.SUCCEEDED);
        waitUntil(this.dag, DAGState.COMMITTING);
        ((CountingOutputCommitter) vertex.getOutputCommitter("v12Out")).unblockCommit();
        waitUntil(this.dag, DAGState.SUCCEEDED);
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertexImpl.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertexImpl.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitSucceeded2_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.COMMITTING, vertex3.getState());
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertex.getOutputCommitter("v12Out")).unblockCommit();
        Thread.sleep(500L);
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertex3.getOutputCommitter("v3Out")).unblockCommit();
        waitUntil(this.dag, DAGState.SUCCEEDED);
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitSucceeded3_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlanWith2VertexGroupOutputs(true, true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.COMMITTING, vertex3.getState());
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertex.getOutputCommitter("v12Out1")).unblockCommit();
        ((CountingOutputCommitter) vertex.getOutputCommitter("v12Out2")).unblockCommit();
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertex3.getOutputCommitter("v3Out")).unblockCommit();
        waitUntil(this.dag, DAGState.SUCCEEDED);
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(0L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitFail1_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan(false, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.COMMITTING, vertex3.getState());
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertex.getOutputCommitter("v12Out")).unblockCommit();
        waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals(VertexState.KILLED, vertex3.getState());
        Assert.assertEquals(VertexTerminationCause.OTHER_VERTEX_FAILURE, vertex3.getTerminationCause());
        Assert.assertTrue(vertex3.commitFutures.isEmpty());
        Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex3.getOutputCommitter("v3Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitFail2_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan(true, false));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.COMMITTING, vertex3.getState());
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertex3.getOutputCommitter("v3Out")).unblockCommit();
        waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals(VertexState.FAILED, vertex3.getState());
        Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, vertex3.getTerminationCause());
        Assert.assertTrue(vertex3.commitFutures.isEmpty());
        Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitFail3_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan(true, false));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.COMMITTING, vertex3.getState());
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertex.getOutputCommitter("v12Out")).unblockCommit();
        waitForCommitCompleted(this.dag, new DAGImpl.OutputKey("v12Out", "uv12", true));
        ((CountingOutputCommitter) vertex3.getOutputCommitter("v3Out")).unblockCommit();
        waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals(VertexState.FAILED, vertex3.getState());
        Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, vertex3.getTerminationCause());
        Assert.assertTrue(vertex3.commitFutures.isEmpty());
        Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitFail4_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan(false, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertexImpl.handle(new VertexEventTaskCompleted(vertexImpl.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.COMMITTING, vertexImpl.getState());
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertexImpl.getOutputCommitter("v3Out")).unblockCommit();
        waitForCommitCompleted(this.dag, new DAGImpl.OutputKey("v3Out", "vertex3", true));
        waitUntil(vertexImpl, VertexState.SUCCEEDED);
        Assert.assertTrue(vertexImpl.commitFutures.isEmpty());
        ((CountingOutputCommitter) vertex.getOutputCommitter("v12Out")).unblockCommit();
        waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertexImpl.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertexImpl.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGInternalErrorWhileCommiting_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        waitUntil(this.dag, DAGState.COMMITTING);
        this.dag.handle(new DAGEvent(this.dag.getID(), DAGEventType.INTERNAL_ERROR));
        waitUntil(this.dag, DAGState.ERROR);
        Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex3.getOutputCommitter("v3Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
    }

    @Test(timeout = 5000)
    public void testDAGKilledWhileCommitting1_OnDAGSuccess() throws Exception {
        _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout = 5000)
    public void testServiceErrorWhileCommitting1_OnDAGSuccess() throws Exception {
        _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause dAGTerminationCause) throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        waitUntil(this.dag, DAGState.COMMITTING);
        this.dag.handle(new DAGEventTerminateDag(this.dag.getID(), dAGTerminationCause, (String) null));
        waitUntil(this.dag, dAGTerminationCause.getFinishedState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex3.getState());
        Assert.assertEquals(dAGTerminationCause, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex3.getOutputCommitter("v3Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGKilledWhileCommitting1_OnVertexSuccess() throws Exception {
        _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout = 5000)
    public void testServiceErrorWhileCommitting1_OnVertexSuccess() throws Exception {
        _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause dAGTerminationCause) throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.COMMITTING, vertex3.getState());
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        ((CountingOutputCommitter) vertex3.getOutputCommitter("v3Out")).unblockCommit();
        waitUntil(this.dag, DAGState.COMMITTING);
        this.dag.handle(new DAGEventTerminateDag(this.dag.getID(), dAGTerminationCause, (String) null));
        waitUntil(this.dag, dAGTerminationCause.getFinishedState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex3.getState());
        Assert.assertEquals(dAGTerminationCause.getFinishedState(), this.dag.getState());
        Assert.assertEquals(dAGTerminationCause, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, r0.initCounter);
        Assert.assertEquals(1L, r0.setupCounter);
        Assert.assertEquals(1L, r0.commitCounter);
        Assert.assertEquals(1L, r0.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGKilledWhileRunning_OnVertexSuccess() throws Exception {
        _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout = 5000)
    public void testServiceErrorWhileRunning_OnVertexSuccess() throws Exception {
        _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause dAGTerminationCause) throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        Assert.assertEquals(VertexState.COMMITTING, vertex3.getState());
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        this.dag.handle(new DAGEventTerminateDag(this.dag.getID(), dAGTerminationCause, (String) null));
        waitUntil(this.dag, dAGTerminationCause.getFinishedState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.KILLED, vertex3.getState());
        Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, vertex3.getTerminationCause());
        Assert.assertTrue(vertex3.commitFutures.isEmpty());
        Assert.assertEquals(dAGTerminationCause.getFinishedState(), this.dag.getState());
        Assert.assertEquals(dAGTerminationCause, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex3.getOutputCommitter("v3Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitVertexRerunWhileCommitting_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        VertexImpl vertex = this.dag.getVertex("vertex2");
        VertexImpl vertex2 = this.dag.getVertex("vertex3");
        vertexImpl.handle(new VertexEventTaskCompleted(vertexImpl.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        waitUntil(this.dag, DAGState.COMMITTING);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        vertexImpl.handle(new VertexEventTaskReschedule(tezTaskID));
        waitUntil(this.dag, DAGState.TERMINATING);
        waitUntil(vertexImpl, VertexState.TERMINATING);
        vertexImpl.handle(new VertexEventTaskCompleted(tezTaskID, TaskState.KILLED));
        waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(DAGState.FAILED, this.dag.getState());
        Assert.assertEquals(VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING, vertexImpl.getTerminationCause());
        Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertexImpl.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertexImpl.getVertexId(), 2);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertexImpl.getOutputCommitter("v12Out");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex2.getOutputCommitter("v3Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitInternalErrorWhileCommiting_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan(true, true));
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        waitUntil(this.dag, DAGState.COMMITTING);
        this.dag.handle(new DAGEvent(this.dag.getID(), DAGEventType.INTERNAL_ERROR));
        waitUntil(this.dag, DAGState.ERROR);
        Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex3.getOutputCommitter("v3Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testVertexGroupCommitFinishedEventFail_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        setupDAG(createDAGPlan(true, true));
        this.historyEventHandler.failVertexGroupCommitFinishedEvent = true;
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex3.getOutputCommitter("v3Out");
        countingOutputCommitter.unblockCommit();
        waitUntil(this.dag, DAGState.FAILED);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals(DAGState.FAILED, this.dag.getState());
        Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        Assert.assertEquals(VertexState.KILLED, vertex3.getState());
        Assert.assertEquals(VertexTerminationCause.OTHER_VERTEX_FAILURE, vertex3.getTerminationCause());
        Assert.assertTrue(vertex3.commitFutures.isEmpty());
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testDAGCommitStartedEventFail_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan(true, true));
        this.historyEventHandler.failDAGCommitStartedEvent = true;
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex3.getOutputCommitter("v3Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(0L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(0L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }

    @Test(timeout = 5000)
    public void testCommitCanceled_OnDAGSuccess() throws Exception {
        _testCommitCanceled_OnDAGSuccess(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout = 5000)
    public void testCommitCanceled_OnDAGSuccess2() throws Exception {
        _testCommitCanceled_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testCommitCanceled_OnDAGSuccess(DAGTerminationCause dAGTerminationCause) throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        setupDAG(createDAGPlan(true, true));
        this.rawExecutor = new ControlledThreadPoolExecutor(1);
        this.execService = MoreExecutors.listeningDecorator(this.rawExecutor);
        ((AppContext) Mockito.doReturn(this.execService).when(this.appContext)).getExecService();
        initDAG(this.dag);
        startDAG(this.dag);
        VertexImpl vertex = this.dag.getVertex("vertex1");
        VertexImpl vertex2 = this.dag.getVertex("vertex2");
        VertexImpl vertex3 = this.dag.getVertex("vertex3");
        vertex.handle(new VertexEventTaskCompleted(vertex.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex2.handle(new VertexEventTaskCompleted(vertex2.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        vertex3.handle(new VertexEventTaskCompleted(vertex3.getTask(0).getTaskId(), TaskState.SUCCEEDED));
        waitUntil(this.dag, DAGState.COMMITTING);
        Assert.assertEquals(2L, this.dag.commitFutures.size());
        this.dag.handle(new DAGEventTerminateDag(this.dag.getID(), dAGTerminationCause, (String) null));
        waitUntil(this.dag, dAGTerminationCause.getFinishedState());
        Assert.assertEquals(dAGTerminationCause, this.dag.getTerminationCause());
        Assert.assertTrue(this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(vertex3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(vertex3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertex.getOutputCommitter("v12Out");
        CountingOutputCommitter countingOutputCommitter2 = (CountingOutputCommitter) vertex3.getOutputCommitter("v3Out");
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        Assert.assertEquals(0L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
        Assert.assertEquals(0L, countingOutputCommitter2.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
    }
}
