package dev.openfeature.contrib.providers.flagd.resolver.process.storage;

import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

@SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "Feature flag comes as a Json configuration, hence they must be exposed")
/* loaded from: input_file:dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.class */
public class FlagStore implements Storage {
    private static final Logger log = Logger.getLogger(FlagStore.class.getName());
    private final ReentrantReadWriteLock sync = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = this.sync.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = this.sync.writeLock();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<StorageState> stateBlockingQueue = new LinkedBlockingQueue(1);
    private final Map<String, FeatureFlag> flags = new HashMap();
    private final Connector connector;

    public FlagStore(Connector connector) {
        this.connector = connector;
    }

    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage
    public void init() {
        this.connector.init();
        Thread thread = new Thread(() -> {
            try {
                streamerListener(this.connector);
            } catch (InterruptedException e) {
                log.log(Level.WARNING, "connection listener failed", (Throwable) e);
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage
    public void shutdown() throws InterruptedException {
        if (this.shutdown.getAndSet(true)) {
            return;
        }
        this.connector.shutdown();
    }

    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage
    public FeatureFlag getFlag(String str) {
        this.readLock.lock();
        try {
            return this.flags.get(str);
        } finally {
            this.readLock.unlock();
        }
    }

    @Override // dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage
    public BlockingQueue<StorageState> getStateQueue() {
        return this.stateBlockingQueue;
    }

    private void streamerListener(Connector connector) throws InterruptedException {
        BlockingQueue<StreamPayload> stream = connector.getStream();
        while (!this.shutdown.get()) {
            StreamPayload take = stream.take();
            switch (take.getType()) {
                case DATA:
                    try {
                        Map<String, FeatureFlag> parseString = FlagParser.parseString(take.getData());
                        this.writeLock.lock();
                        try {
                            this.flags.clear();
                            this.flags.putAll(parseString);
                            this.writeLock.unlock();
                            if (!this.stateBlockingQueue.offer(StorageState.OK)) {
                                log.log(Level.WARNING, "Failed to convey OK satus, queue is full");
                            }
                            break;
                        } catch (Throwable th) {
                            this.writeLock.unlock();
                            throw th;
                            break;
                        }
                    } catch (Throwable th2) {
                        log.log(Level.WARNING, "Invalid flag sync payload from connector", th2);
                        if (!this.stateBlockingQueue.offer(StorageState.STALE)) {
                            log.log(Level.WARNING, "Failed to convey STALE satus, queue is full");
                            break;
                        } else {
                            break;
                        }
                    }
                case ERROR:
                    if (!this.stateBlockingQueue.offer(StorageState.ERROR)) {
                        log.log(Level.WARNING, "Failed to convey ERROR satus, queue is full");
                        break;
                    } else {
                        break;
                    }
                default:
                    log.log(Level.INFO, String.format("Payload with unknown type: %s", take.getType()));
                    break;
            }
        }
        log.log(Level.INFO, "Shutting down store stream listener");
    }
}
