package dev.responsive.kafka.api.async.internals.events;

import dev.responsive.kafka.api.async.internals.AsyncUtils;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/api/async/internals/events/AsyncEvent.class */
public class AsyncEvent {
    private final Logger log;
    private long transitionTimeNanos;
    private State currentState;
    private final Object inputRecordKey;
    private final Object inputRecordValue;
    private final String asyncProcessorName;
    private final TaskId taskId;
    private final long systemTime;
    private final long streamTime;
    private final ProcessorRecordContext recordContext;
    private final Runnable processInputRecord;
    private final List<StateTransitionListener> stateTransitionListeners;
    private final Queue<DelayedForward<?, ?>> outputForwards;
    private final Queue<DelayedWrite<?, ?>> outputWrites;
    private RuntimeException processingException;

    /* loaded from: input_file:dev/responsive/kafka/api/async/internals/events/AsyncEvent$State.class */
    public enum State {
        SCHEDULING,
        TO_PROCESS,
        PROCESSING,
        TO_FINALIZE,
        FINALIZING,
        FAILED,
        DONE
    }

    /* loaded from: input_file:dev/responsive/kafka/api/async/internals/events/AsyncEvent$StateTransitionListener.class */
    public interface StateTransitionListener {
        void onStateTransition(State state, long j, State state2, long j2);
    }

    public AsyncEvent(String str, Record<?, ?> record, String str2, TaskId taskId, ProcessorRecordContext processorRecordContext, long j, long j2, Runnable runnable, List<StateTransitionListener> list) {
        this(str, record.key(), record.value(), str2, taskId, processorRecordContext, j, j2, runnable, list);
    }

    public AsyncEvent(String str, FixedKeyRecord<?, ?> fixedKeyRecord, String str2, TaskId taskId, ProcessorRecordContext processorRecordContext, long j, long j2, Runnable runnable, List<StateTransitionListener> list) {
        this(str, fixedKeyRecord.key(), fixedKeyRecord.value(), str2, taskId, processorRecordContext, j, j2, runnable, list);
    }

    private AsyncEvent(String str, Object obj, Object obj2, String str2, TaskId taskId, ProcessorRecordContext processorRecordContext, long j, long j2, Runnable runnable, List<StateTransitionListener> list) {
        this.outputForwards = new LinkedList();
        this.outputWrites = new LinkedList();
        this.processingException = null;
        this.currentState = State.SCHEDULING;
        this.transitionTimeNanos = System.nanoTime();
        this.inputRecordKey = obj;
        this.inputRecordValue = obj2;
        this.asyncProcessorName = str2;
        this.taskId = taskId;
        this.streamTime = j;
        this.systemTime = j2;
        this.recordContext = processorRecordContext;
        this.processInputRecord = runnable;
        this.stateTransitionListeners = list;
        if (processorRecordContext == null) {
            this.log = new LogContext(str).logger(AsyncEvent.class);
        } else {
            this.log = new LogContext(String.format("%s <%d> ", str, Long.valueOf(processorRecordContext.offset()))).logger(AsyncEvent.class);
        }
    }

    public Runnable inputRecordProcessor() {
        return this.processInputRecord;
    }

    public void addForwardedRecord(DelayedForward<?, ?> delayedForward) {
        this.outputForwards.add(delayedForward);
    }

    public void addWrittenRecord(DelayedWrite<?, ?> delayedWrite) {
        this.outputWrites.add(delayedWrite);
    }

    public Optional<RuntimeException> processingException() {
        return Optional.ofNullable(this.processingException);
    }

    public <KOut, VOut> DelayedForward<KOut, VOut> nextForward() {
        return (DelayedForward) this.outputForwards.poll();
    }

    public <KS, VS> DelayedWrite<KS, VS> nextWrite() {
        return (DelayedWrite) this.outputWrites.poll();
    }

    public State currentState() {
        return this.currentState;
    }

    private void transitionTo(State state) {
        long nanoTime = System.nanoTime();
        this.stateTransitionListeners.forEach(stateTransitionListener -> {
            stateTransitionListener.onStateTransition(this.currentState, this.transitionTimeNanos, state, nanoTime);
        });
        this.transitionTimeNanos = nanoTime;
        this.currentState = state;
    }

    public void transitionToFailed(RuntimeException runtimeException) {
        if (this.currentState.equals(State.DONE)) {
            this.log.error("[{}] Attempted to mark async event as failed but it was already in the DONE state", this.currentState.name());
            throw new IllegalStateException("Cannot transition to FAILED from the state " + this.currentState.name());
        }
        transitionTo(State.FAILED);
        this.processingException = runtimeException;
    }

    public void transitionToToProcess() {
        if (this.currentState.equals(State.SCHEDULING)) {
            transitionTo(State.TO_PROCESS);
        } else {
            this.log.error("[{}] Attempted to mark an async event as ready for processing but the event was not in the SCHEDULING state", this.currentState.name());
            throw new IllegalStateException("Cannot transition to AWAITING_PROCESS from the state " + this.currentState.name());
        }
    }

    public void transitionToProcessing() {
        if (this.currentState.equals(State.TO_PROCESS)) {
            transitionTo(State.PROCESSING);
        } else {
            this.log.error("[{}] Attempted to mark an async event as being processed but the event was not in the TO_PROCESS state", this.currentState.name());
            throw new IllegalStateException("Cannot transition to PROCESSING from the state " + this.currentState.name());
        }
    }

    public void transitionToToFinalize() {
        if (this.currentState.equals(State.PROCESSING)) {
            transitionTo(State.TO_FINALIZE);
        } else {
            this.log.error("[{}] Attempted to mark an async event as ready for finalization but the event was not in the PROCESSING state", this.currentState.name());
            throw new IllegalStateException("Cannot transition to TO_FINALIZE from the state " + this.currentState.name());
        }
    }

    public void transitionToFinalizing() {
        if (this.currentState.equals(State.TO_FINALIZE)) {
            transitionTo(State.FINALIZING);
        } else {
            this.log.error("[{}] Attempted to mark an async event as finalizing but the event was not in the AWAITING_FINALIZE state", this.currentState.name());
            throw new IllegalStateException("Cannot transition to FINALIZING from the state " + this.currentState.name());
        }
    }

    public void transitionToDone() {
        if (this.currentState.equals(State.DONE)) {
            this.log.warn("we are transitioning to DONE from DONE. This should only happen ifwe hit a failure during finalization");
            return;
        }
        if (!this.currentState.equals(State.FINALIZING) && !this.currentState.equals(State.FAILED)) {
            this.log.error("[{}] Attempted to mark an async event as DONE but the event was not in the FINALIZING or FAILED state", this.currentState.name());
            throw new IllegalStateException("Cannot transition to DONE from the state " + this.currentState.name());
        }
        if (this.outputForwards.isEmpty() && this.outputWrites.isEmpty()) {
            transitionTo(State.DONE);
        } else {
            this.log.error("[{}] Attempted to mark an async event as complete without draining all output queuesfirst. Remaining forwards={} and remaining writes={}", new Object[]{this.currentState.name(), Integer.valueOf(this.outputForwards.size()), Integer.valueOf(this.outputWrites.size())});
            throw new IllegalStateException("Can't transition to DONE when there are still records in the output buffers");
        }
    }

    public ProcessorRecordContext recordContext() {
        return this.recordContext;
    }

    public long streamTime() {
        return this.streamTime;
    }

    public long systemTime() {
        return this.systemTime;
    }

    public String asyncProcessorName() {
        return this.asyncProcessorName;
    }

    public TaskId taskId() {
        return this.taskId;
    }

    public int partition() {
        return this.taskId.partition();
    }

    public <KIn> KIn inputRecordKey() {
        return (KIn) this.inputRecordKey;
    }

    public <K, V> KeyValue<K, V> inputRecord() {
        return new KeyValue<>(this.inputRecordKey, this.inputRecordValue);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        AsyncEvent asyncEvent = (AsyncEvent) obj;
        if (this.taskId == asyncEvent.taskId && this.systemTime == asyncEvent.systemTime && this.streamTime == asyncEvent.streamTime && this.inputRecordKey.equals(asyncEvent.inputRecordKey)) {
            return Objects.equals(this.recordContext, asyncEvent.recordContext);
        }
        return false;
    }

    public int hashCode() {
        return (31 * ((31 * ((31 * Objects.hash(this.taskId, this.inputRecordKey)) + ((int) (this.systemTime ^ (this.systemTime >>> 32))))) + ((int) (this.streamTime ^ (this.streamTime >>> 32))))) + (this.recordContext != null ? AsyncUtils.processorRecordContextHashCode(this.recordContext, false) : 0);
    }

    public String toString() {
        return "AsyncEvent{currentState=" + this.currentState + ", inputRecordKey=" + this.inputRecordKey + ", recordContext=" + this.recordContext + ", numForwards=" + this.outputForwards.size() + ", numWrites=" + this.outputWrites.size() + "}";
    }
}
