package org.apache.tez.dag.history.logging.impl;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.codehaus.jettison.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.class */
public class SimpleHistoryLoggingService extends HistoryLoggingService {
    private Path logFileLocation;
    private FileSystem logFileFS;
    private FSDataOutputStream outputStream;
    private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
    public static final String LOG_FILE_NAME_PREFIX = "history.txt";
    private Thread eventHandlingThread;
    private AtomicBoolean stopped;
    private int consecutiveErrors;
    private int maxErrors;
    private boolean loggingDisabled;
    private static final Logger LOG = LoggerFactory.getLogger(SimpleHistoryLoggingService.class);
    public static final String RECORD_SEPARATOR = "\u0001" + System.getProperty("line.separator");

    public SimpleHistoryLoggingService() {
        super(SimpleHistoryLoggingService.class.getName());
        this.eventQueue = new LinkedBlockingQueue<>();
        this.stopped = new AtomicBoolean(false);
        this.consecutiveErrors = 0;
        this.loggingDisabled = false;
    }

    protected void serviceInit(Configuration configuration) throws Exception {
        String str = configuration.get("tez.simple.history.logging.dir");
        String str2 = "history.txt." + this.appContext.getApplicationAttemptId();
        if (str == null || str.isEmpty()) {
            String str3 = this.appContext.getLogDirs()[new Random().nextInt(this.appContext.getLogDirs().length)];
            LOG.info("Log file location for SimpleHistoryLoggingService not specified, defaulting to containerLogDir=" + str3);
            this.logFileFS = FileSystem.getLocal(configuration).getRawFileSystem();
            this.logFileLocation = str3 != null ? new Path(str3, str2) : new Path(str2);
        } else {
            LOG.info("Using configured log file location for SimpleHistoryLoggingService logDirPath=" + str);
            Path path = new Path(str);
            this.logFileFS = path.getFileSystem(configuration);
            if (!this.logFileFS.exists(path)) {
                this.logFileFS.mkdirs(path);
            }
            this.logFileLocation = new Path(this.logFileFS.resolvePath(path), str2);
        }
        this.maxErrors = configuration.getInt("tez.simple.history.max.errors", 10);
        LOG.info("Initializing SimpleHistoryLoggingService, logFileLocation=" + this.logFileLocation + ", maxErrors=" + this.maxErrors);
        super.serviceInit(configuration);
    }

    protected void serviceStart() throws Exception {
        LOG.info("Starting SimpleHistoryLoggingService");
        this.outputStream = this.logFileFS.create(this.logFileLocation, true);
        this.eventHandlingThread = new Thread(new Runnable() { // from class: org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService.1
            @Override // java.lang.Runnable
            public void run() {
                while (!SimpleHistoryLoggingService.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    try {
                        SimpleHistoryLoggingService.this.handleEvent((DAGHistoryEvent) SimpleHistoryLoggingService.this.eventQueue.take());
                    } catch (InterruptedException e) {
                        SimpleHistoryLoggingService.LOG.info("EventQueue take interrupted. Returning");
                        return;
                    }
                }
            }
        }, "HistoryEventHandlingThread");
        this.eventHandlingThread.start();
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        DAGHistoryEvent poll;
        LOG.info("Stopping SimpleHistoryLoggingService, eventQueueBacklog=" + this.eventQueue.size());
        this.stopped.set(true);
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        while (!this.eventQueue.isEmpty() && (poll = this.eventQueue.poll()) != null) {
            handleEvent(poll);
        }
        try {
            if (this.outputStream != null) {
                this.outputStream.hflush();
                this.outputStream.close();
            }
        } catch (IOException e) {
            LOG.warn("Failed to close output stream", e);
        }
        super.serviceStop();
    }

    @Override // org.apache.tez.dag.history.logging.HistoryLoggingService
    public void handle(DAGHistoryEvent dAGHistoryEvent) {
        this.eventQueue.add(dAGHistoryEvent);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void handleEvent(DAGHistoryEvent dAGHistoryEvent) {
        if (this.loggingDisabled) {
            return;
        }
        LOG.info("Writing event " + dAGHistoryEvent.getHistoryEvent().getEventType() + " to history file");
        try {
            try {
                this.outputStream.writeBytes(HistoryEventJsonConversion.convertToJson(dAGHistoryEvent.getHistoryEvent()).toString());
                this.outputStream.writeBytes(RECORD_SEPARATOR);
            } catch (JSONException e) {
                LOG.warn("Failed to convert event to json", e);
            }
            this.consecutiveErrors = 0;
        } catch (IOException e2) {
            this.consecutiveErrors++;
            if (this.consecutiveErrors < this.maxErrors) {
                LOG.error("Failed to write to output stream, consecutiveErrorCount=" + this.consecutiveErrors, e2);
            } else {
                this.loggingDisabled = true;
                LOG.error("Disabling SimpleHistoryLoggingService due to multiple errors,consecutive max errors reached, maxErrors=" + this.maxErrors);
            }
        }
    }
}
