package dev.restate.sdk.core;

import dev.restate.generated.service.protocol.Protocol;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.common.syscalls.HandlerDefinition;
import dev.restate.sdk.common.syscalls.HandlerRunner;
import dev.restate.sdk.common.syscalls.HandlerSpecification;
import dev.restate.sdk.common.syscalls.SyscallCallback;
import dev.restate.sdk.common.syscalls.Syscalls;
import dev.restate.sdk.core.InvocationFlow;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dev/restate/sdk/core/ResolvedEndpointHandlerImpl.class */
final class ResolvedEndpointHandlerImpl implements ResolvedEndpointHandler {
    private static final Logger LOG = LogManager.getLogger(ResolvedEndpointHandlerImpl.class);
    private final Protocol.ServiceProtocolVersion serviceProtocolVersion;
    private final InvocationStateMachine stateMachine;
    private final InvocationFlow.InvocationInputSubscriber input;
    private final InvocationFlow.InvocationOutputPublisher output;
    private final HandlerSpecification<Object, Object> spec;
    private final HandlerRunner<Object, Object, Object> wrappedHandler;
    private final Object serviceOptions;
    private final Executor syscallsExecutor;

    /* loaded from: input_file:dev/restate/sdk/core/ResolvedEndpointHandlerImpl$HandlerRunnerWrapper.class */
    private static class HandlerRunnerWrapper<REQ, RES, O> implements HandlerRunner<REQ, RES, O> {
        private final HandlerRunner<REQ, RES, O> handler;

        private HandlerRunnerWrapper(HandlerRunner<REQ, RES, O> handlerRunner) {
            this.handler = handlerRunner;
        }

        public void run(HandlerSpecification<REQ, RES> handlerSpecification, Syscalls syscalls, O o, SyscallCallback<ByteBuffer> syscallCallback) {
            try {
                this.handler.run(handlerSpecification, syscalls, o, syscallCallback);
            } catch (Throwable th) {
                syscallCallback.onCancel(th);
            }
        }
    }

    public ResolvedEndpointHandlerImpl(Protocol.ServiceProtocolVersion serviceProtocolVersion, InvocationStateMachine invocationStateMachine, HandlerDefinition<?, ?, Object> handlerDefinition, Object obj, Executor executor) {
        this.serviceProtocolVersion = serviceProtocolVersion;
        this.stateMachine = invocationStateMachine;
        this.input = new MessageDecoder(new ExceptionCatchingSubscriber(invocationStateMachine));
        this.output = new MessageEncoder(invocationStateMachine);
        this.spec = handlerDefinition.getSpec();
        this.wrappedHandler = new HandlerRunnerWrapper(handlerDefinition.getRunner());
        this.serviceOptions = obj;
        this.syscallsExecutor = executor;
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        LOG.trace("Start processing invocation");
        this.output.subscribe(subscriber);
        this.stateMachine.startAndConsumeInput(SyscallCallback.of(request -> {
            SyscallsInternal executorSwitchingSyscalls = this.syscallsExecutor != null ? new ExecutorSwitchingSyscalls(new SyscallsImpl(request, this.stateMachine), this.syscallsExecutor) : new SyscallsImpl(request, this.stateMachine);
            this.wrappedHandler.run(this.spec, executorSwitchingSyscalls, this.serviceOptions, SyscallCallback.of(byteBuffer -> {
                writeOutputAndEnd(executorSwitchingSyscalls, byteBuffer);
            }, th -> {
                end(executorSwitchingSyscalls, th);
            }));
        }, th -> {
        }));
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.input.onSubscribe(subscription);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(ByteBuffer byteBuffer) {
        this.input.onNext(byteBuffer);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        this.input.onError(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        this.input.onComplete();
    }

    @Override // dev.restate.sdk.core.ResolvedEndpointHandler
    public String responseContentType() {
        return ServiceProtocol.serviceProtocolVersionToHeaderValue(this.serviceProtocolVersion);
    }

    private void writeOutputAndEnd(SyscallsInternal syscallsInternal, ByteBuffer byteBuffer) {
        Runnable runnable = () -> {
            LOG.trace("Wrote output message");
            end(syscallsInternal, null);
        };
        Objects.requireNonNull(syscallsInternal);
        syscallsInternal.writeOutput(byteBuffer, SyscallCallback.ofVoid(runnable, syscallsInternal::fail));
    }

    private void end(SyscallsInternal syscallsInternal, Throwable th) {
        if (th == null || Util.containsSuspendedException(th)) {
            syscallsInternal.close();
            return;
        }
        LOG.warn("Error when processing the invocation", th);
        if (!Util.isTerminalException(th)) {
            syscallsInternal.fail(th);
            return;
        }
        Runnable runnable = () -> {
            LOG.trace("Closed correctly with non ok exception", th);
            syscallsInternal.close();
        };
        Objects.requireNonNull(syscallsInternal);
        syscallsInternal.writeOutput((TerminalException) th, SyscallCallback.ofVoid(runnable, syscallsInternal::fail));
    }
}
