package io.zeebe.broker.logstreams.processor;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamRecordWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.WorkflowInstanceRelatedIntent;
import io.zeebe.transport.ServerOutput;
import io.zeebe.util.sched.ActorControl;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/logstreams/processor/TypedStreamProcessor.class */
public class TypedStreamProcessor implements StreamProcessor {
    private static final Logger LOG = Loggers.WORKFLOW_PROCESSOR_LOGGER;
    protected final ServerOutput output;
    protected final RecordProcessorMap recordProcessors;
    protected final ZeebeState zeebeState;
    protected final EnumMap<ValueType, Class<? extends UnpackedObject>> eventRegistry;
    protected final EnumMap<ValueType, UnpackedObject> eventCache;
    private final TypedStreamEnvironment environment;
    private DelegatingEventProcessor eventProcessorWrapper;
    private ActorControl actor;
    private StreamProcessorContext streamProcessorContext;
    private TypedStreamWriterImpl streamWriter;
    protected final List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList();
    protected final RecordMetadata metadata = new RecordMetadata();
    protected final TypedEventImpl typedEvent = new TypedEventImpl();

    /* loaded from: input_file:io/zeebe/broker/logstreams/processor/TypedStreamProcessor$DelegatingEventProcessor.class */
    protected static class DelegatingEventProcessor implements EventProcessor {
        public static final String PROCESSING_ERROR_MESSAGE = "Expected to process event %s without errors, but exception occurred with message %s .";
        protected final int streamProcessorId;
        protected final LogStream logStream;
        protected final TypedStreamWriterImpl writer;
        protected final TypedResponseWriterImpl responseWriter;
        private final ZeebeState zeebeState;
        protected TypedRecordProcessor<?> eventProcessor;
        protected TypedEventImpl event;
        private SideEffectProducer sideEffectProducer;
        private long position;

        public DelegatingEventProcessor(int i, ServerOutput serverOutput, LogStream logStream, TypedStreamWriterImpl typedStreamWriterImpl, ZeebeState zeebeState) {
            this.streamProcessorId = i;
            this.logStream = logStream;
            this.writer = typedStreamWriterImpl;
            this.responseWriter = new TypedResponseWriterImpl(serverOutput, logStream.getPartitionId());
            this.zeebeState = zeebeState;
        }

        public void wrap(TypedRecordProcessor<?> typedRecordProcessor, TypedEventImpl typedEventImpl, long j) {
            this.eventProcessor = typedRecordProcessor;
            this.event = typedEventImpl;
            this.position = j;
        }

        public void processEvent() {
            resetOutput();
            this.sideEffectProducer = this.responseWriter;
            if (!this.zeebeState.isOnBlacklist(this.event)) {
                this.eventProcessor.processRecord(this.position, this.event, this.responseWriter, this.writer, this::setSideEffectProducer);
            }
        }

        public void processingFailed(Exception exc) {
            resetOutput();
            String format = String.format(PROCESSING_ERROR_MESSAGE, this.event, exc.getMessage());
            TypedStreamProcessor.LOG.error(format, exc);
            if (this.event.metadata.getRecordType() == RecordType.COMMAND) {
                sendCommandRejectionOnException(format);
                writeCommandRejectionOnException(format);
            }
            if (shouldBeBlacklisted(this.event.getMetadata().getIntent())) {
                this.zeebeState.blacklist(this.event);
            }
        }

        private boolean shouldBeBlacklisted(Intent intent) {
            if (isWorkflowInstanceRelated(intent)) {
                return ((WorkflowInstanceRelatedIntent) intent).shouldBlacklistInstanceOnError();
            }
            return false;
        }

        private boolean isWorkflowInstanceRelated(Intent intent) {
            return intent instanceof WorkflowInstanceRelatedIntent;
        }

        private void resetOutput() {
            this.responseWriter.reset();
            this.writer.reset();
            this.writer.configureSourceContext(this.streamProcessorId, this.position);
        }

        private void writeCommandRejectionOnException(String str) {
            this.writer.appendRejection(this.event, RejectionType.PROCESSING_ERROR, str);
        }

        private void sendCommandRejectionOnException(String str) {
            this.responseWriter.writeRejectionOnCommand(this.event, RejectionType.PROCESSING_ERROR, str);
        }

        public void setSideEffectProducer(SideEffectProducer sideEffectProducer) {
            this.sideEffectProducer = sideEffectProducer;
        }

        public boolean executeSideEffects() {
            return this.sideEffectProducer.flush();
        }

        public long writeEvent(LogStreamRecordWriter logStreamRecordWriter) {
            return this.writer.flush();
        }
    }

    public TypedStreamProcessor(ServerOutput serverOutput, RecordProcessorMap recordProcessorMap, List<StreamProcessorLifecycleAware> list, EnumMap<ValueType, Class<? extends UnpackedObject>> enumMap, ZeebeState zeebeState, TypedStreamEnvironment typedStreamEnvironment) {
        this.output = serverOutput;
        this.recordProcessors = recordProcessorMap;
        this.zeebeState = zeebeState;
        recordProcessorMap.values().forEachRemaining(typedRecordProcessor -> {
            this.lifecycleListeners.add(typedRecordProcessor);
        });
        this.lifecycleListeners.addAll(list);
        this.eventCache = new EnumMap<>(ValueType.class);
        enumMap.forEach((valueType, cls) -> {
        });
        this.eventRegistry = enumMap;
        this.environment = typedStreamEnvironment;
    }

    public void onOpen(StreamProcessorContext streamProcessorContext) {
        LogStream logStream = streamProcessorContext.getLogStream();
        this.streamWriter = new TypedStreamWriterImpl(logStream, this.eventRegistry, getKeyGenerator());
        this.eventProcessorWrapper = new DelegatingEventProcessor(streamProcessorContext.getId(), this.output, logStream, this.streamWriter, this.zeebeState);
        this.actor = streamProcessorContext.getActorControl();
        this.streamProcessorContext = streamProcessorContext;
        this.lifecycleListeners.forEach(streamProcessorLifecycleAware -> {
            streamProcessorLifecycleAware.onOpen(this);
        });
    }

    public void onRecovered() {
        this.lifecycleListeners.forEach(streamProcessorLifecycleAware -> {
            streamProcessorLifecycleAware.onRecovered(this);
        });
    }

    public void onClose() {
        this.lifecycleListeners.forEach(streamProcessorLifecycleAware -> {
            streamProcessorLifecycleAware.onClose();
        });
    }

    public EventProcessor onEvent(LoggedEvent loggedEvent) {
        long position = loggedEvent.getPosition();
        this.metadata.reset();
        loggedEvent.readMetadata(this.metadata);
        TypedRecordProcessor<?> typedRecordProcessor = this.recordProcessors.get(this.metadata.getRecordType(), this.metadata.getValueType(), this.metadata.getIntent().value());
        if (typedRecordProcessor == null) {
            return null;
        }
        UnpackedObject unpackedObject = this.eventCache.get(this.metadata.getValueType());
        unpackedObject.reset();
        loggedEvent.readValue(unpackedObject);
        this.typedEvent.wrap(loggedEvent, this.metadata, unpackedObject);
        this.eventProcessorWrapper.wrap(typedRecordProcessor, this.typedEvent, position);
        return this.eventProcessorWrapper;
    }

    public ActorControl getActor() {
        return this.actor;
    }

    public StreamProcessorContext getStreamProcessorContext() {
        return this.streamProcessorContext;
    }

    public TypedStreamEnvironment getEnvironment() {
        return this.environment;
    }

    public KeyGenerator getKeyGenerator() {
        return this.zeebeState.getKeyGenerator();
    }

    public TypedStreamWriterImpl getStreamWriter() {
        return this.streamWriter;
    }
}
