package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexRecovery.class */
public class TestVertexRecovery {
    private static final Log LOG = LogFactory.getLog(TestVertexRecovery.class);
    private DrainDispatcher dispatcher;
    private AppContext mockAppContext;
    private DAGImpl dag;
    private VertexEventHanlder vertexEventHandler;
    private TaskEventHandler taskEventHandler;
    private ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
    private TezDAGID dagId = TezDAGID.getInstance(this.appId, 1);
    private String user = "user";
    private long initRequestedTime = 100;
    private long initedTime = this.initRequestedTime + 100;

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexRecovery$TaskAttemptEventHandler.class */
    class TaskAttemptEventHandler implements EventHandler<TaskAttemptEvent> {
        TaskAttemptEventHandler() {
        }

        public void handle(TaskAttemptEvent taskAttemptEvent) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexRecovery$TaskEventHandler.class */
    public class TaskEventHandler implements EventHandler<TaskEvent> {
        private List<TaskEvent> events = new ArrayList();

        TaskEventHandler() {
        }

        public void handle(TaskEvent taskEvent) {
            this.events.add(taskEvent);
            TestVertexRecovery.this.dag.getVertex(taskEvent.getTaskID().getVertexID()).getTask(taskEvent.getTaskID()).handle(taskEvent);
        }

        public List<TaskEvent> getEvents() {
            return this.events;
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexRecovery$VertexEventHanlder.class */
    class VertexEventHanlder implements EventHandler<VertexEvent> {
        private List<VertexEvent> events = new ArrayList();

        VertexEventHanlder() {
        }

        public void handle(VertexEvent vertexEvent) {
            this.events.add(vertexEvent);
            TestVertexRecovery.this.dag.getVertex(vertexEvent.getVertexId()).handle(vertexEvent);
        }

        public List<VertexEvent> getEvents() {
            return this.events;
        }
    }

    private DAGProtos.DAGPlan createDAGPlan() {
        return DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output").build()).setName("outputx").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("e1").addInEdgeId("e2").addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output").build()).setName("outputx").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v1")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v2")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    @Before
    public void setUp() throws IOException {
        this.dispatcher = new DrainDispatcher();
        this.dispatcher.register(DAGEventType.class, (EventHandler) Mockito.mock(EventHandler.class));
        this.vertexEventHandler = new VertexEventHanlder();
        this.dispatcher.register(VertexEventType.class, this.vertexEventHandler);
        this.taskEventHandler = new TaskEventHandler();
        this.dispatcher.register(TaskEventType.class, this.taskEventHandler);
        this.dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventHandler());
        this.dispatcher.init(new Configuration());
        this.dispatcher.start();
        this.mockAppContext = (AppContext) Mockito.mock(AppContext.class, Mockito.RETURNS_DEEP_STUBS);
        this.dag = new DAGImpl(this.dagId, new Configuration(), createDAGPlan(), this.dispatcher.getEventHandler(), (TaskAttemptListener) Mockito.mock(TaskAttemptListener.class), new Credentials(), new SystemClock(), this.user, (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), this.mockAppContext);
        Mockito.when(this.mockAppContext.getCurrentDAG()).thenReturn(this.dag);
        this.dag.handle(new DAGEvent(this.dagId, DAGEventType.DAG_INIT));
        LOG.info("finish setUp");
    }

    @Test
    public void testRecovery_Desired_SUCCEEDED() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        Assert.assertEquals(VertexState.INITED, vertexImpl.restoreFromEvent(new VertexInitializedEvent(vertexImpl.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, vertexImpl.getTotalTasks(), "", (Map) null)));
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(vertexImpl.numTasks, vertexImpl.succeededTaskCount);
        Assert.assertEquals(vertexImpl.numTasks, vertexImpl.completedTaskCount);
        assertTaskRecoveredEventSent(vertexImpl);
        Assert.assertEquals(VertexState.NEW, this.dag.getVertex("vertex3").getState());
        Assert.assertEquals(0L, this.vertexEventHandler.getEvents().size());
    }

    @Test
    public void testRecovery_Desired_FAILED() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        Assert.assertEquals(VertexState.INITED, vertexImpl.restoreFromEvent(new VertexInitializedEvent(vertexImpl.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, vertexImpl.getTotalTasks(), "", (Map) null)));
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(vertexImpl.numTasks, vertexImpl.failedTaskCount);
        Assert.assertEquals(0L, vertexImpl.completedTaskCount);
        assertTaskRecoveredEventSent(vertexImpl);
        Assert.assertEquals(VertexState.NEW, this.dag.getVertex("vertex3").getState());
        Assert.assertEquals(0L, this.vertexEventHandler.getEvents().size());
    }

    @Test
    public void testRecovery_Desired_KILLED() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        Assert.assertEquals(VertexState.INITED, vertexImpl.restoreFromEvent(new VertexInitializedEvent(vertexImpl.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, vertexImpl.getTotalTasks(), "", (Map) null)));
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.KILLED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.KILLED, vertexImpl.getState());
        Assert.assertEquals(vertexImpl.numTasks, vertexImpl.killedTaskCount);
        Assert.assertEquals(0L, vertexImpl.completedTaskCount);
        assertTaskRecoveredEventSent(vertexImpl);
        Assert.assertEquals(VertexState.NEW, this.dag.getVertex("vertex3").getState());
        Assert.assertEquals(0L, this.vertexEventHandler.getEvents().size());
    }

    @Test
    public void testRecovery_Desired_ERROR() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        Assert.assertEquals(VertexState.INITED, vertexImpl.restoreFromEvent(new VertexInitializedEvent(vertexImpl.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, vertexImpl.getTotalTasks(), "", (Map) null)));
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.ERROR));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.ERROR, vertexImpl.getState());
        Assert.assertEquals(vertexImpl.numTasks, vertexImpl.failedTaskCount);
        Assert.assertEquals(0L, vertexImpl.completedTaskCount);
        assertTaskRecoveredEventSent(vertexImpl);
        Assert.assertEquals(VertexState.NEW, this.dag.getVertex("vertex3").getState());
        Assert.assertEquals(0L, this.vertexEventHandler.getEvents().size());
    }

    private TezEvent createTezEvent() {
        return new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.allocate(0)), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", (String) null, (TezTaskAttemptID) null));
    }

    @Test
    public void testRecovery_New_Desired_RUNNING() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        Assert.assertEquals(VertexState.NEW, vertexImpl.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(vertexImpl.getVertexId(), Lists.newArrayList(new TezEvent[]{createTezEvent()}))));
        Assert.assertEquals(1L, vertexImpl.recoveredEvents.size());
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImpl.recoveredEvents.size());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        assertOutputCommitters(vertexImpl);
        Assert.assertEquals(VertexState.RECOVERING, this.dag.getVertex("vertex3").getState());
        Assert.assertEquals(1L, r0.numRecoveredSourceVertices);
        Assert.assertEquals(1L, r0.numInitedSourceVertices);
        Assert.assertEquals(1L, r0.numStartedSourceVertices);
        Assert.assertEquals(1L, r0.getDistanceFromRoot());
    }

    private void assertTaskRecoveredEventSent(VertexImpl vertexImpl) {
        int i = 0;
        Iterator<TaskEvent> it = this.taskEventHandler.getEvents().iterator();
        while (it.hasNext()) {
            TaskEventRecoverTask taskEventRecoverTask = (TaskEvent) it.next();
            if (taskEventRecoverTask.getType() == TaskEventType.T_RECOVER && taskEventRecoverTask.getTaskID().getVertexID().equals(vertexImpl.getVertexId())) {
                i++;
            }
        }
        Assert.assertEquals("expect " + vertexImpl.getTotalTasks() + " TaskEventTaskRecover sent for vertex:" + vertexImpl.getVertexId() + "but actuall sent " + i, vertexImpl.getTotalTasks(), i);
    }

    private void assertOutputCommitters(VertexImpl vertexImpl) {
        Assert.assertTrue(vertexImpl.getOutputCommitters() != null);
        Iterator it = vertexImpl.getOutputCommitters().values().iterator();
        while (it.hasNext()) {
            TestVertexImpl.CountingOutputCommitter countingOutputCommitter = (TestVertexImpl.CountingOutputCommitter) ((OutputCommitter) it.next());
            Assert.assertEquals(0L, countingOutputCommitter.abortCounter);
            Assert.assertEquals(0L, countingOutputCommitter.commitCounter);
            Assert.assertEquals(1L, countingOutputCommitter.initCounter);
            Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        }
    }

    private void restoreFromInitializedEvent(VertexImpl vertexImpl) {
        long j = 100 + 100;
        Assert.assertEquals(VertexState.INITED, vertexImpl.restoreFromEvent(new VertexInitializedEvent(vertexImpl.getVertexId(), "vertex1", 100L, j, vertexImpl.getTotalTasks(), "", (Map) null)));
        Assert.assertEquals(vertexImpl.getTotalTasks(), vertexImpl.getTasks().size());
        Assert.assertEquals(100L, vertexImpl.initTimeRequested);
        Assert.assertEquals(j, vertexImpl.initedTime);
    }

    @Test
    public void testRecovery_Inited_Desired_RUNNING() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        restoreFromInitializedEvent(vertexImpl);
        Assert.assertEquals(VertexState.INITED, vertexImpl.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(vertexImpl.getVertexId(), Lists.newArrayList(new TezEvent[]{createTezEvent()}))));
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImpl.recoveredEvents.size());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        assertTaskRecoveredEventSent(vertexImpl);
        assertOutputCommitters(vertexImpl);
        Assert.assertEquals(VertexState.RECOVERING, this.dag.getVertex("vertex3").getState());
        Assert.assertEquals(1L, r0.numRecoveredSourceVertices);
        Assert.assertEquals(1L, r0.numInitedSourceVertices);
        Assert.assertEquals(1L, r0.numStartedSourceVertices);
        Assert.assertEquals(1L, r0.getDistanceFromRoot());
    }

    @Test
    public void testRecovery_Started_Desired_RUNNING() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        restoreFromInitializedEvent(vertexImpl);
        long j = this.initedTime + 100;
        long j2 = j + 100;
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.restoreFromEvent(new VertexStartedEvent(vertexImpl.getVertexId(), j, j2)));
        Assert.assertEquals(j, vertexImpl.startTimeRequested);
        Assert.assertEquals(j2, vertexImpl.startedTime);
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(vertexImpl.getVertexId(), Lists.newArrayList(new TezEvent[]{createTezEvent()}))));
        Assert.assertEquals(1L, vertexImpl.recoveredEvents.size());
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImpl.recoveredEvents.size());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        assertTaskRecoveredEventSent(vertexImpl);
        assertOutputCommitters(vertexImpl);
        Assert.assertEquals(VertexState.RECOVERING, this.dag.getVertex("vertex3").getState());
        Assert.assertEquals(1L, r0.numRecoveredSourceVertices);
        Assert.assertEquals(1L, r0.numInitedSourceVertices);
        Assert.assertEquals(1L, r0.numStartedSourceVertices);
        Assert.assertEquals(1L, r0.getDistanceFromRoot());
    }

    @Test
    public void testRecovery_Finished_Desired_RUNNING() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        restoreFromInitializedEvent(vertexImpl);
        long j = this.initedTime + 100;
        long j2 = j + 100;
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.restoreFromEvent(new VertexStartedEvent(vertexImpl.getVertexId(), j, j2)));
        long j3 = j2 + 100;
        VertexState restoreFromEvent = vertexImpl.restoreFromEvent(new VertexFinishedEvent(vertexImpl.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, j, j2, j3, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), (Map) null));
        Assert.assertEquals(j3, vertexImpl.finishTime);
        Assert.assertEquals(VertexState.SUCCEEDED, restoreFromEvent);
        Assert.assertEquals(false, Boolean.valueOf(vertexImpl.recoveryCommitInProgress));
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImpl.recoveredEvents.size());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        assertTaskRecoveredEventSent(vertexImpl);
        assertOutputCommitters(vertexImpl);
        Assert.assertEquals(VertexState.RECOVERING, this.dag.getVertex("vertex3").getState());
        Assert.assertEquals(1L, r0.numRecoveredSourceVertices);
        Assert.assertEquals(1L, r0.numInitedSourceVertices);
        Assert.assertEquals(1L, r0.numStartedSourceVertices);
        Assert.assertEquals(1L, r0.getDistanceFromRoot());
    }

    @Test
    public void testRecovery_RecoveringFromNew() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(1L, vertexImpl.getTasks().size());
        assertOutputCommitters(vertexImpl);
        VertexImpl vertexImpl2 = (VertexImpl) this.dag.getVertex("vertex3");
        Assert.assertEquals(VertexState.NEW, vertexImpl2.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(vertexImpl2.getVertexId(), Lists.newArrayList(new TezEvent[]{createTezEvent()}))));
        Assert.assertEquals(1L, vertexImpl2.recoveredEvents.size());
        Assert.assertEquals(VertexState.RECOVERING, vertexImpl2.getState());
        Assert.assertEquals(1L, vertexImpl2.numRecoveredSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.numInitedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.numStartedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.getDistanceFromRoot());
        VertexImpl vertex = this.dag.getVertex("vertex2");
        vertex.handle(new VertexEventRecoverVertex(vertex.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertex.getState());
        Assert.assertNull(vertex.getOutputCommitters());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(2L, vertexImpl2.numRecoveredSourceVertices);
        Assert.assertEquals(2L, vertexImpl2.numInitedSourceVertices);
        Assert.assertEquals(2L, vertexImpl2.numStartedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.getDistanceFromRoot());
        Assert.assertEquals(0L, vertexImpl2.recoveredEvents.size());
        assertOutputCommitters(vertexImpl2);
    }

    @Test
    public void testRecovery_RecoveringFromInited() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        restoreFromInitializedEvent(vertexImpl);
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(vertexImpl.getTotalTasks(), vertexImpl.getTasks().size());
        assertOutputCommitters(vertexImpl);
        VertexImpl vertexImpl2 = (VertexImpl) this.dag.getVertex("vertex3");
        Assert.assertEquals(VertexState.NEW, vertexImpl2.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(vertexImpl2.getVertexId(), Lists.newArrayList(new TezEvent[]{createTezEvent()}))));
        Assert.assertEquals(1L, vertexImpl2.recoveredEvents.size());
        Assert.assertEquals(VertexState.INITED, vertexImpl2.restoreFromEvent(new VertexInitializedEvent(vertexImpl2.getVertexId(), "vertex3", this.initRequestedTime, this.initedTime, 2, "", (Map) null)));
        Assert.assertEquals(VertexState.RECOVERING, vertexImpl2.getState());
        Assert.assertEquals(1L, vertexImpl2.numRecoveredSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.numInitedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.numStartedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.getDistanceFromRoot());
        VertexImpl vertexImpl3 = (VertexImpl) this.dag.getVertex("vertex2");
        restoreFromInitializedEvent(vertexImpl3);
        vertexImpl3.handle(new VertexEventRecoverVertex(vertexImpl3.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(2L, vertexImpl2.numRecoveredSourceVertices);
        Assert.assertEquals(2L, vertexImpl2.numInitedSourceVertices);
        Assert.assertEquals(2L, vertexImpl2.numStartedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.getDistanceFromRoot());
        Assert.assertEquals(0L, vertexImpl2.recoveredEvents.size());
        assertOutputCommitters(vertexImpl2);
        assertTaskRecoveredEventSent(vertexImpl);
        assertTaskRecoveredEventSent(vertexImpl3);
        assertTaskRecoveredEventSent(vertexImpl2);
    }

    @Test
    public void testRecovery_RecoveringFromRunning() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        restoreFromInitializedEvent(vertexImpl);
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.restoreFromEvent(new VertexStartedEvent(vertexImpl.getVertexId(), this.initRequestedTime + 100, this.initRequestedTime + 200)));
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(1L, vertexImpl.getTasks().size());
        assertOutputCommitters(vertexImpl);
        VertexImpl vertexImpl2 = (VertexImpl) this.dag.getVertex("vertex3");
        Assert.assertEquals(VertexState.NEW, vertexImpl2.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(vertexImpl2.getVertexId(), Lists.newArrayList(new TezEvent[]{createTezEvent()}))));
        Assert.assertEquals(1L, vertexImpl2.recoveredEvents.size());
        Assert.assertEquals(VertexState.INITED, vertexImpl2.restoreFromEvent(new VertexInitializedEvent(vertexImpl2.getVertexId(), "vertex3", this.initRequestedTime, this.initedTime, vertexImpl2.getTotalTasks(), "", (Map) null)));
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.restoreFromEvent(new VertexStartedEvent(vertexImpl2.getVertexId(), this.initRequestedTime + 100, this.initRequestedTime + 200)));
        Assert.assertEquals(VertexState.RECOVERING, vertexImpl2.getState());
        Assert.assertEquals(1L, vertexImpl2.numRecoveredSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.numInitedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.numStartedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.getDistanceFromRoot());
        VertexImpl vertexImpl3 = (VertexImpl) this.dag.getVertex("vertex2");
        Assert.assertEquals(VertexState.INITED, vertexImpl3.restoreFromEvent(new VertexInitializedEvent(vertexImpl3.getVertexId(), "vertex2", this.initRequestedTime, this.initedTime, vertexImpl3.getTotalTasks(), "", (Map) null)));
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.restoreFromEvent(new VertexStartedEvent(vertexImpl3.getVertexId(), this.initRequestedTime + 100, this.initRequestedTime + 200)));
        vertexImpl3.handle(new VertexEventRecoverVertex(vertexImpl3.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(2L, vertexImpl2.numRecoveredSourceVertices);
        Assert.assertEquals(2L, vertexImpl2.numInitedSourceVertices);
        Assert.assertEquals(2L, vertexImpl2.numStartedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.getDistanceFromRoot());
        Assert.assertEquals(0L, vertexImpl2.recoveredEvents.size());
        assertOutputCommitters(vertexImpl2);
        assertTaskRecoveredEventSent(vertexImpl);
        assertTaskRecoveredEventSent(vertexImpl3);
        assertTaskRecoveredEventSent(vertexImpl2);
    }

    @Test
    public void testRecovery_RecoveringFromSUCCEEDED() {
        VertexImpl vertexImpl = (VertexImpl) this.dag.getVertex("vertex1");
        restoreFromInitializedEvent(vertexImpl);
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.restoreFromEvent(new VertexStartedEvent(vertexImpl.getVertexId(), this.initRequestedTime + 100, this.initRequestedTime + 200)));
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.restoreFromEvent(new VertexFinishedEvent(vertexImpl.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, this.initRequestedTime + 300, this.initRequestedTime + 400, this.initRequestedTime + 500, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), (Map) null)));
        vertexImpl.handle(new VertexEventRecoverVertex(vertexImpl.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(1L, vertexImpl.getTasks().size());
        assertOutputCommitters(vertexImpl);
        VertexImpl vertexImpl2 = (VertexImpl) this.dag.getVertex("vertex3");
        Assert.assertEquals(VertexState.NEW, vertexImpl2.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(vertexImpl2.getVertexId(), Lists.newArrayList(new TezEvent[]{createTezEvent()}))));
        Assert.assertEquals(1L, vertexImpl2.recoveredEvents.size());
        restoreFromInitializedEvent(vertexImpl2);
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.restoreFromEvent(new VertexStartedEvent(vertexImpl2.getVertexId(), this.initRequestedTime + 100, this.initRequestedTime + 200)));
        Assert.assertEquals(VertexState.RECOVERING, vertexImpl2.getState());
        Assert.assertEquals(1L, vertexImpl2.numRecoveredSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.numInitedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.numStartedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.getDistanceFromRoot());
        VertexImpl vertexImpl3 = (VertexImpl) this.dag.getVertex("vertex2");
        Assert.assertEquals(VertexState.INITED, vertexImpl3.restoreFromEvent(new VertexInitializedEvent(vertexImpl3.getVertexId(), "vertex2", this.initRequestedTime, this.initedTime, vertexImpl3.getTotalTasks(), "", (Map) null)));
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.restoreFromEvent(new VertexStartedEvent(vertexImpl3.getVertexId(), this.initRequestedTime + 100, this.initRequestedTime + 200)));
        vertexImpl3.handle(new VertexEventRecoverVertex(vertexImpl3.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(2L, vertexImpl2.numRecoveredSourceVertices);
        Assert.assertEquals(2L, vertexImpl2.numInitedSourceVertices);
        Assert.assertEquals(2L, vertexImpl2.numStartedSourceVertices);
        Assert.assertEquals(1L, vertexImpl2.getDistanceFromRoot());
        Assert.assertEquals(0L, vertexImpl2.recoveredEvents.size());
        assertOutputCommitters(vertexImpl2);
        assertTaskRecoveredEventSent(vertexImpl);
        assertTaskRecoveredEventSent(vertexImpl3);
        assertTaskRecoveredEventSent(vertexImpl2);
    }
}
