package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.backoff.GrpcStreamConnectorBackoffService;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc;
import dev.openfeature.flagd.grpc.sync.Sync;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Context;
import io.grpc.ManagedChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

@SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"}, justification = "Random is used to generate a variation & flag configurations require exposing")
/* loaded from: input_file:dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.class */
public class GrpcStreamConnector implements Connector {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GrpcStreamConnector.class);
    private static final int QUEUE_SIZE = 5;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<QueuePayload> blockingQueue = new LinkedBlockingQueue(5);
    private final ManagedChannel channel;
    private final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub;
    private final FlagSyncServiceGrpc.FlagSyncServiceBlockingStub serviceBlockingStub;
    private final int deadline;
    private final int streamDeadlineMs;
    private final String selector;
    private final int retryBackoffMillis;

    public GrpcStreamConnector(FlagdOptions flagdOptions) {
        this.channel = ChannelBuilder.nettyChannel(flagdOptions);
        this.serviceStub = FlagSyncServiceGrpc.newStub(this.channel);
        this.serviceBlockingStub = FlagSyncServiceGrpc.newBlockingStub(this.channel);
        this.deadline = flagdOptions.getDeadline();
        this.streamDeadlineMs = flagdOptions.getStreamDeadlineMs();
        this.selector = flagdOptions.getSelector();
        this.retryBackoffMillis = flagdOptions.getRetryBackoffMs();
    }

    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector
    public void init() {
        Thread thread = new Thread(() -> {
            try {
                observeEventStream(this.blockingQueue, this.shutdown, this.serviceStub, this.serviceBlockingStub, this.selector, this.deadline, this.streamDeadlineMs, this.retryBackoffMillis);
            } catch (InterruptedException e) {
                log.warn("gRPC event stream interrupted, flag configurations are stale", e);
                Thread.currentThread().interrupt();
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector
    public BlockingQueue<QueuePayload> getStream() {
        return this.blockingQueue;
    }

    /* JADX WARN: Finally extract failed */
    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector
    public void shutdown() throws InterruptedException {
        if (this.shutdown.getAndSet(true)) {
            return;
        }
        try {
            if (this.channel != null && !this.channel.isShutdown()) {
                this.channel.shutdown();
                this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
            }
            if (this.channel == null || this.channel.isShutdown()) {
                return;
            }
            this.channel.shutdownNow();
            this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
            log.warn(String.format("Unable to shut down channel by %d deadline", Integer.valueOf(this.deadline)));
        } catch (Throwable th) {
            if (this.channel != null && !this.channel.isShutdown()) {
                this.channel.shutdownNow();
                this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
                log.warn(String.format("Unable to shut down channel by %d deadline", Integer.valueOf(this.deadline)));
            }
            throw th;
        }
    }

    static void observeEventStream(BlockingQueue<QueuePayload> blockingQueue, AtomicBoolean atomicBoolean, FlagSyncServiceGrpc.FlagSyncServiceStub flagSyncServiceStub, FlagSyncServiceGrpc.FlagSyncServiceBlockingStub flagSyncServiceBlockingStub, String str, int i, int i2, int i3) throws InterruptedException {
        Throwable error;
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(5);
        GrpcStreamConnectorBackoffService grpcStreamConnectorBackoffService = new GrpcStreamConnectorBackoffService(i3);
        log.info("Initializing sync stream observer");
        while (!atomicBoolean.get()) {
            blockingQueue.clear();
            Exception exc = null;
            log.debug("Initializing sync stream request");
            Sync.SyncFlagsRequest.Builder newBuilder = Sync.SyncFlagsRequest.newBuilder();
            Sync.GetMetadataRequest.Builder newBuilder2 = Sync.GetMetadataRequest.newBuilder();
            Sync.GetMetadataResponse defaultInstance = Sync.GetMetadataResponse.getDefaultInstance();
            if (str != null) {
                newBuilder.setSelector(str);
            }
            Context.CancellableContext withCancellation = Context.current().withCancellation();
            Throwable th = null;
            FlagSyncServiceGrpc.FlagSyncServiceStub flagSyncServiceStub2 = flagSyncServiceStub;
            if (i2 > 0) {
                try {
                    try {
                        flagSyncServiceStub2 = (FlagSyncServiceGrpc.FlagSyncServiceStub) flagSyncServiceStub2.withDeadlineAfter(i2, TimeUnit.MILLISECONDS);
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (withCancellation != null) {
                        if (th != null) {
                            try {
                                withCancellation.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            withCancellation.close();
                        }
                    }
                    throw th2;
                }
            }
            flagSyncServiceStub2.syncFlags(newBuilder.m1692build(), new GrpcStreamHandler(linkedBlockingQueue));
            try {
                defaultInstance = flagSyncServiceBlockingStub.withDeadlineAfter(i, TimeUnit.MILLISECONDS).getMetadata(newBuilder2.m1598build());
            } catch (Exception e) {
                exc = e;
            }
            while (true) {
                if (atomicBoolean.get()) {
                    break;
                }
                GrpcResponseModel grpcResponseModel = (GrpcResponseModel) linkedBlockingQueue.take();
                if (grpcResponseModel.isComplete()) {
                    log.info("Sync stream completed");
                    break;
                }
                error = grpcResponseModel.getError();
                if (error != null || exc != null) {
                    break;
                }
                String flagConfiguration = grpcResponseModel.getSyncFlagsResponse().getFlagConfiguration();
                log.debug("Got stream response: {}", flagConfiguration);
                if (!blockingQueue.offer(new QueuePayload(QueuePayloadType.DATA, flagConfiguration, defaultInstance))) {
                    log.error("Stream writing failed");
                }
                grpcStreamConnectorBackoffService.reset();
            }
            long currentBackoffMillis = grpcStreamConnectorBackoffService.getCurrentBackoffMillis();
            if (grpcStreamConnectorBackoffService.shouldRetrySilently()) {
                logExceptions(Level.INFO, error, exc, currentBackoffMillis);
            } else {
                logExceptions(Level.ERROR, error, exc, currentBackoffMillis);
                if (!blockingQueue.offer(new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata", defaultInstance))) {
                    log.error("Failed to convey ERROR status, queue is full");
                }
            }
            withCancellation.cancel(exc);
            if (withCancellation != null) {
                if (0 != 0) {
                    try {
                        withCancellation.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    withCancellation.close();
                }
            }
            if (!atomicBoolean.get()) {
                log.debug("Stream failed, retrying in {}ms", Long.valueOf(grpcStreamConnectorBackoffService.getCurrentBackoffMillis()));
                grpcStreamConnectorBackoffService.waitUntilNextAttempt();
            }
        }
        log.info("Shutdown invoked, exiting event stream listener");
    }

    private static void logExceptions(Level level, Throwable th, Exception exc, long j) {
        if (th != null) {
            log.atLevel(level).setCause(th).log("Error initializing stream, retrying in {}ms", Long.valueOf(j));
        }
        if (exc != null) {
            log.atLevel(level).setCause(exc).log("Error initializing metadata, retrying in {}ms", Long.valueOf(j));
        }
    }
}
