package dev.restate.sdk.http.vertx;

import com.google.protobuf.InvalidProtocolBufferException;
import dev.restate.generated.service.discovery.Discovery;
import dev.restate.sdk.core.InvocationFlow;
import dev.restate.sdk.core.InvocationHandler;
import dev.restate.sdk.core.ProtocolException;
import dev.restate.sdk.core.RestateGrpcServer;
import io.grpc.Status;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.AsciiString;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.reactiverse.contextual.logging.ContextualData;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.impl.HttpServerRequestInternal;
import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dev/restate/sdk/http/vertx/RequestHttpServerHandler.class */
class RequestHttpServerHandler implements Handler<HttpServerRequest> {
    private static final String APPLICATION_PROTO = "application/proto";
    private static final String DISCOVER_PATH = "/discover";
    private final RestateGrpcServer restateGrpcServer;
    private final HashMap<String, Executor> blockingServices;
    private final OpenTelemetry openTelemetry;
    private static final Logger LOG = LogManager.getLogger(RequestHttpServerHandler.class);
    private static final AsciiString APPLICATION_RESTATE = AsciiString.cached("application/restate");
    private static final Pattern SLASH = Pattern.compile(Pattern.quote("/"));
    static TextMapGetter<MultiMap> OTEL_TEXT_MAP_GETTER = new TextMapGetter<MultiMap>() { // from class: dev.restate.sdk.http.vertx.RequestHttpServerHandler.1
        public Iterable<String> keys(MultiMap multiMap) {
            return multiMap.names();
        }

        @Nullable
        public String get(@Nullable MultiMap multiMap, String str) {
            if (multiMap == null) {
                return null;
            }
            return multiMap.get(str);
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public RequestHttpServerHandler(RestateGrpcServer restateGrpcServer, HashMap<String, Executor> hashMap, OpenTelemetry openTelemetry) {
        this.restateGrpcServer = restateGrpcServer;
        this.blockingServices = hashMap;
        this.openTelemetry = openTelemetry;
    }

    public void handle(HttpServerRequest httpServerRequest) {
        URI create = URI.create(httpServerRequest.uri());
        if (DISCOVER_PATH.equalsIgnoreCase(create.getPath())) {
            handleDiscoveryRequest(httpServerRequest);
            return;
        }
        String[] split = SLASH.split(create.getPath());
        if (split.length < 3) {
            LOG.warn("Path doesn't match the pattern /invoke/SvcName/MethodName nor /discover: '{}'", httpServerRequest.path());
            httpServerRequest.response().setStatusCode(HttpResponseStatus.NOT_FOUND.code()).end();
            return;
        }
        String str = split[split.length - 2];
        String str2 = split[split.length - 1];
        boolean containsKey = this.blockingServices.containsKey(str);
        try {
            InvocationHandler resolve = this.restateGrpcServer.resolve(str, str2, this.openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), httpServerRequest.headers(), OTEL_TEXT_MAP_GETTER), new RestateGrpcServer.LoggingContextSetter() { // from class: dev.restate.sdk.http.vertx.RequestHttpServerHandler.2
                public void setServiceMethod(String str3) {
                    ContextualData.put("restateServiceMethod", str3);
                }

                public void setInvocationId(String str3) {
                    ContextualData.put("restateInvocationId", str3);
                }

                public void setInvocationStatus(String str3) {
                    ContextualData.put("restateInvocationStatus", str3);
                }
            }, containsKey ? currentContextExecutor(((HttpServerRequestInternal) httpServerRequest).context()) : null, containsKey ? blockingExecutor(str) : null);
            LOG.debug("Handling request to " + str + "/" + str2);
            HttpServerResponse response = httpServerRequest.response();
            response.setStatusCode(HttpResponseStatus.OK.code());
            response.putHeader(HttpHeaderNames.CONTENT_TYPE, APPLICATION_RESTATE);
            response.setChunked(true);
            HttpRequestFlowAdapter httpRequestFlowAdapter = new HttpRequestFlowAdapter(httpServerRequest);
            InvocationFlow.InvocationOutputSubscriber httpResponseFlowAdapter = new HttpResponseFlowAdapter(response);
            httpRequestFlowAdapter.subscribe(resolve.input());
            resolve.output().subscribe(httpResponseFlowAdapter);
            resolve.start();
        } catch (ProtocolException e) {
            LOG.warn("Error when resolving the grpc handler", e);
            httpServerRequest.response().setStatusCode(e.getFailureCode() == Status.Code.NOT_FOUND.value() ? HttpResponseStatus.NOT_FOUND.code() : HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
        }
    }

    private Executor currentContextExecutor(io.vertx.core.Context context) {
        return runnable -> {
            context.runOnContext(r3 -> {
                runnable.run();
            });
        };
    }

    private Executor blockingExecutor(String str) {
        return this.blockingServices.get(str);
    }

    private void handleDiscoveryRequest(HttpServerRequest httpServerRequest) {
        if (!httpServerRequest.method().equals(HttpMethod.POST)) {
            httpServerRequest.response().setStatusCode(HttpResponseStatus.METHOD_NOT_ALLOWED.code()).end();
        } else if (httpServerRequest.getHeader(HttpHeaderNames.CONTENT_TYPE).equalsIgnoreCase(APPLICATION_PROTO)) {
            httpServerRequest.body().andThen(asyncResult -> {
                if (asyncResult.failed()) {
                    LOG.warn("Error when reading the request body of discovery request", asyncResult.cause());
                    httpServerRequest.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
                    return;
                }
                try {
                    httpServerRequest.response().setStatusCode(HttpResponseStatus.OK.code()).putHeader(HttpHeaderNames.CONTENT_TYPE, APPLICATION_PROTO).end(Buffer.buffer(Unpooled.wrappedBuffer(this.restateGrpcServer.handleDiscoveryRequest(Discovery.ServiceDiscoveryRequest.parseFrom(((Buffer) asyncResult.result()).getByteBuf().nioBuffer())).toByteString().asReadOnlyByteBuffer())));
                } catch (InvalidProtocolBufferException e) {
                    LOG.warn("Cannot parse discovery request", e);
                    httpServerRequest.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
                }
            });
        } else {
            httpServerRequest.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
        }
    }
}
