package org.apache.tez.analyzer.plugins;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.LinkedList;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;

/* loaded from: input_file:org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.class */
public class ShuffleTimeAnalyzer extends TezAnalyzerBase implements Analyzer {
    private static final String REAL_WORK_DONE_RATIO = "tez.shuffle-time-analyzer.real-work.done.ratio";
    private static final float REAL_WORK_DONE_RATIO_DEFAULT = 0.5f;
    private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records";
    private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000;
    private static final String[] headers = {"vertexName", "taskAttemptId", "Node", "counterGroup", "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", "TotalTime", "Time_taken_to_receive_all_events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME", "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED", "SHUFFLE_BYTES_DISK_DIRECT"};
    private final CSVResult csvResult;
    private final float realWorkDoneRatio;
    private final long minShuffleRecords;

    public ShuffleTimeAnalyzer(Configuration configuration) {
        super(configuration);
        this.csvResult = new CSVResult(headers);
        this.realWorkDoneRatio = configuration.getFloat(REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT);
        this.minShuffleRecords = configuration.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT);
    }

    @Override // org.apache.tez.analyzer.Analyzer
    public void analyze(DagInfo dagInfo) throws TezException {
        String str;
        for (VertexInfo vertexInfo : dagInfo.getVertices()) {
            for (TaskAttemptInfo taskAttemptInfo : vertexInfo.getTaskAttempts()) {
                Map counter = taskAttemptInfo.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.toString());
                Map counter2 = taskAttemptInfo.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.toString());
                if (counter != null) {
                    for (Map.Entry entry : counter.entrySet()) {
                        String str2 = (String) entry.getKey();
                        long value = ((TezCounter) entry.getValue()).getValue();
                        long value2 = counter2.get(str2) != null ? ((TezCounter) counter2.get(str2)).getValue() : 0L;
                        if (value2 > 0 && (((float) value) * 1.0f) / ((float) value2) > 0.0f && value2 > this.minShuffleRecords) {
                            LinkedList newLinkedList = Lists.newLinkedList();
                            newLinkedList.add(vertexInfo.getVertexName());
                            newLinkedList.add(taskAttemptInfo.getTaskAttemptId());
                            newLinkedList.add(taskAttemptInfo.getNodeId());
                            newLinkedList.add(str2);
                            str = "";
                            String counterValue = getCounterValue(TaskCounter.MERGE_PHASE_TIME, str2, taskAttemptInfo);
                            String str3 = "";
                            if (!Strings.isNullOrEmpty(counterValue)) {
                                long timeTaken = taskAttemptInfo.getTimeTaken() - Long.parseLong(counterValue);
                                str = (((float) timeTaken) * 1.0f) / ((float) taskAttemptInfo.getTimeTaken()) < this.realWorkDoneRatio ? "Time taken in shuffle is more than the actual work being done in task.  Check if source/destination machine is a slow node. Check if merge phase time is more to understand disk bottlenecks in this node.  Check for skew" : "";
                                str3 = Long.toString(timeTaken);
                            }
                            newLinkedList.add(str);
                            newLinkedList.add(value + "");
                            newLinkedList.add(value2 + "");
                            newLinkedList.add("" + ((1.0f * ((float) value)) / ((float) value2)));
                            newLinkedList.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, str2, taskAttemptInfo));
                            newLinkedList.add(Long.toString(taskAttemptInfo.getTimeTaken()));
                            newLinkedList.add(getOverheadFromSourceTasks(str2, taskAttemptInfo));
                            newLinkedList.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, str2, taskAttemptInfo));
                            newLinkedList.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, str2, taskAttemptInfo));
                            newLinkedList.add(str3);
                            newLinkedList.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, str2, taskAttemptInfo));
                            newLinkedList.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, str2, taskAttemptInfo));
                            newLinkedList.add(getCounterValue(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, str2, taskAttemptInfo));
                            this.csvResult.addRecord((String[]) newLinkedList.toArray(new String[newLinkedList.size()]));
                        }
                    }
                }
            }
        }
    }

    private String getOverheadFromSourceTasks(String str, TaskAttemptInfo taskAttemptInfo) {
        String counterValue = getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, str, taskAttemptInfo);
        String counterValue2 = getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, str, taskAttemptInfo);
        return (Strings.isNullOrEmpty(counterValue) || Strings.isNullOrEmpty(counterValue2)) ? "" : Long.toString(Long.parseLong(counterValue2) - Long.parseLong(counterValue));
    }

    private String getCounterValue(TaskCounter taskCounter, String str, TaskAttemptInfo taskAttemptInfo) {
        Map counter = taskAttemptInfo.getCounter(taskCounter.toString());
        if (counter == null) {
            return "";
        }
        for (Map.Entry entry : counter.entrySet()) {
            String str2 = (String) entry.getKey();
            long value = ((TezCounter) entry.getValue()).getValue();
            if (str2.equals(str)) {
                return Long.toString(value);
            }
        }
        return "";
    }

    @Override // org.apache.tez.analyzer.Analyzer
    public CSVResult getResult() throws TezException {
        return this.csvResult;
    }

    @Override // org.apache.tez.analyzer.Analyzer
    public String getName() {
        return "Shuffle time analyzer";
    }

    @Override // org.apache.tez.analyzer.Analyzer
    public String getDescription() {
        return "Analyze the time taken for shuffle, merge and the real work done in the task";
    }

    public static void main(String[] strArr) throws Exception {
        Configuration configuration = new Configuration();
        ShuffleTimeAnalyzer shuffleTimeAnalyzer = new ShuffleTimeAnalyzer(configuration);
        int run = ToolRunner.run(configuration, shuffleTimeAnalyzer, strArr);
        shuffleTimeAnalyzer.printResults();
        System.exit(run);
    }
}
