package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc;
import dev.openfeature.flagd.grpc.sync.Sync;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.ManagedChannel;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"}, justification = "Random is used to generate a variation & flag configurations require exposing")
/* loaded from: input_file:dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.class */
public class GrpcStreamConnector implements Connector {
    private static final Logger log = LoggerFactory.getLogger(GrpcStreamConnector.class);
    private static final Random RANDOM = new Random();
    private static final int INIT_BACK_OFF = 2000;
    private static final int MAX_BACK_OFF = 120000;
    private static final int QUEUE_SIZE = 5;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<StreamPayload> blockingQueue = new LinkedBlockingQueue(5);
    private final ManagedChannel channel;
    private final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub;
    private final int deadline;
    private final String selector;

    public GrpcStreamConnector(FlagdOptions flagdOptions) {
        this.channel = ChannelBuilder.nettyChannel(flagdOptions);
        this.serviceStub = FlagSyncServiceGrpc.newStub(this.channel);
        this.deadline = flagdOptions.getDeadline();
        this.selector = flagdOptions.getSelector();
    }

    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector
    public void init() {
        Thread thread = new Thread(() -> {
            try {
                Sync.SyncFlagsRequest.Builder newBuilder = Sync.SyncFlagsRequest.newBuilder();
                if (this.selector != null) {
                    newBuilder.setSelector(this.selector);
                }
                observeEventStream(this.blockingQueue, this.shutdown, this.serviceStub, newBuilder.m1691build());
            } catch (InterruptedException e) {
                log.warn("gRPC event stream interrupted, flag configurations are stale", e);
                Thread.currentThread().interrupt();
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector
    public BlockingQueue<StreamPayload> getStream() {
        return this.blockingQueue;
    }

    /* JADX WARN: Finally extract failed */
    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector
    public void shutdown() throws InterruptedException {
        if (this.shutdown.getAndSet(true)) {
            return;
        }
        try {
            if (this.channel != null && !this.channel.isShutdown()) {
                this.channel.shutdown();
                this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
            }
            if (this.channel == null || this.channel.isShutdown()) {
                return;
            }
            this.channel.shutdownNow();
            this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
            log.warn(String.format("Unable to shut down channel by %d deadline", Integer.valueOf(this.deadline)));
        } catch (Throwable th) {
            if (this.channel != null && !this.channel.isShutdown()) {
                this.channel.shutdownNow();
                this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
                log.warn(String.format("Unable to shut down channel by %d deadline", Integer.valueOf(this.deadline)));
            }
            throw th;
        }
    }

    static void observeEventStream(BlockingQueue<StreamPayload> blockingQueue, AtomicBoolean atomicBoolean, FlagSyncServiceGrpc.FlagSyncServiceStub flagSyncServiceStub, Sync.SyncFlagsRequest syncFlagsRequest) throws InterruptedException {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(5);
        int i = INIT_BACK_OFF;
        while (!atomicBoolean.get()) {
            flagSyncServiceStub.syncFlags(syncFlagsRequest, new GrpcStreamHandler(linkedBlockingQueue));
            while (true) {
                if (!atomicBoolean.get()) {
                    GrpcResponseModel grpcResponseModel = (GrpcResponseModel) linkedBlockingQueue.take();
                    if (grpcResponseModel.isComplete()) {
                        break;
                    }
                    if (grpcResponseModel.getError() != null) {
                        log.warn(String.format("Error from grpc connection, retrying in %dms", Integer.valueOf(i)), grpcResponseModel.getError());
                        if (!blockingQueue.offer(new StreamPayload(StreamPayloadType.ERROR, "Error from stream connection, retrying"))) {
                            log.warn("Failed to convey ERROR satus, queue is full");
                        }
                    } else {
                        if (!blockingQueue.offer(new StreamPayload(StreamPayloadType.DATA, grpcResponseModel.getSyncFlagsResponse().getFlagConfiguration()))) {
                            log.warn("Stream writing failed");
                        }
                        i = INIT_BACK_OFF;
                    }
                } else {
                    break;
                }
            }
            if (atomicBoolean.get()) {
                log.info("Shutdown invoked, exiting event stream listener");
                return;
            } else {
                Thread.sleep(i + RANDOM.nextInt(INIT_BACK_OFF));
                if (i < MAX_BACK_OFF) {
                    i = 2 * i;
                }
            }
        }
        log.info("Shutdown invoked, exiting event stream listener");
    }
}
