package dev.restate.sdk.http.vertx;

import com.google.protobuf.MessageLite;
import dev.restate.sdk.core.InvocationFlow;
import dev.restate.sdk.core.Util;
import io.grpc.Status;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import java.util.concurrent.Flow;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dev/restate/sdk/http/vertx/HttpResponseFlowAdapter.class */
class HttpResponseFlowAdapter implements InvocationFlow.InvocationOutputSubscriber {
    private static final Logger LOG = LogManager.getLogger(HttpResponseFlowAdapter.class);
    private final HttpServerResponse httpServerResponse;
    private Flow.Subscription outputSubscription;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpResponseFlowAdapter(HttpServerResponse httpServerResponse) {
        this.httpServerResponse = httpServerResponse;
        this.httpServerResponse.exceptionHandler(this::propagateWireFailure);
    }

    public void onSubscribe(Flow.Subscription subscription) {
        this.outputSubscription = subscription;
        this.outputSubscription.request(Long.MAX_VALUE);
    }

    public void onNext(MessageLite messageLite) {
        write(messageLite);
    }

    public void onError(Throwable th) {
        propagatePublisherFailure(th);
    }

    public void onComplete() {
        endResponse();
    }

    private void write(MessageLite messageLite) {
        if (this.httpServerResponse.closed()) {
            cancelSubscription();
            return;
        }
        LOG.trace("Writing response message " + messageLite);
        Buffer buffer = Buffer.buffer(MessageEncoder.encodeLength(messageLite));
        MessageEncoder.encode(buffer, messageLite);
        this.httpServerResponse.write(buffer);
    }

    private void propagateWireFailure(Throwable th) {
        LOG.warn("Error from wire", th);
        endResponse();
    }

    private void propagatePublisherFailure(Throwable th) {
        if (!this.httpServerResponse.headWritten()) {
            Util.findProtocolException(th).ifPresentOrElse(protocolException -> {
                this.httpServerResponse.setStatusCode(protocolException.getFailureCode() == Status.Code.NOT_FOUND.value() ? 404 : 500);
            }, () -> {
                this.httpServerResponse.setStatusCode(500);
            });
        }
        LOG.warn("Error from publisher", th);
        endResponse();
    }

    private void endResponse() {
        LOG.trace("Closing response");
        if (!this.httpServerResponse.ended()) {
            this.httpServerResponse.end();
        }
        cancelSubscription();
    }

    private void cancelSubscription() {
        LOG.trace("Cancelling subscription");
        if (this.outputSubscription != null) {
            Flow.Subscription subscription = this.outputSubscription;
            this.outputSubscription = null;
            subscription.cancel();
        }
    }
}
