package org.springframework.batch.item.redis.support;

import com.hybhub.util.concurrent.ConcurrentSetBlockingQueue;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Tag;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:org/springframework/batch/item/redis/support/LiveKeyItemReader.class */
public class LiveKeyItemReader extends AbstractProgressReportingItemReader<String> implements RedisPubSubListener<String, String>, RedisClusterPubSubListener<String, String> {
    private static final Logger log = LoggerFactory.getLogger(LiveKeyItemReader.class);
    private static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
    private final AbstractRedisClient client;
    private final String pubSubPattern;
    private final BlockingQueue<String> queue;
    private final long queuePollingTimeout;
    private StatefulRedisPubSubConnection<String, String> pubSubConnection;
    private boolean stopped;
    private Tag nameTag;

    public LiveKeyItemReader(AbstractRedisClient abstractRedisClient, LiveKeyReaderOptions liveKeyReaderOptions) {
        setName(ClassUtils.getShortName(getClass()));
        Assert.notNull(abstractRedisClient, "A Redis client is required.");
        Assert.notNull(liveKeyReaderOptions, "Options are required.");
        this.client = abstractRedisClient;
        this.queue = new ConcurrentSetBlockingQueue(liveKeyReaderOptions.getQueueOptions().getCapacity());
        this.queuePollingTimeout = liveKeyReaderOptions.getQueueOptions().getPollingTimeout().toMillis();
        this.pubSubPattern = String.format(PUBSUB_PATTERN_FORMAT, Integer.valueOf(liveKeyReaderOptions.getDatabase()), liveKeyReaderOptions.getKeyPattern());
    }

    public void setName(String str) {
        this.nameTag = Tag.of("name", str);
        super.setName(str);
    }

    protected synchronized void doOpen() throws InterruptedException, ExecutionException, TimeoutException {
        this.pubSubConnection = ClientUtils.pubSubConnection(this.client);
        MetricsUtils.createGaugeCollectionSize("livekeyreader.queue.size", this.queue, this.nameTag);
        log.info("Subscribing to pub/sub pattern {}, queue capacity: {}", this.pubSubPattern, Integer.valueOf(this.queue.remainingCapacity()));
        if (!(this.pubSubConnection instanceof StatefulRedisClusterPubSubConnection)) {
            this.pubSubConnection.addListener(this);
            this.pubSubConnection.sync().psubscribe(new String[]{this.pubSubPattern});
        } else {
            StatefulRedisClusterPubSubConnection statefulRedisClusterPubSubConnection = this.pubSubConnection;
            statefulRedisClusterPubSubConnection.addListener(this);
            statefulRedisClusterPubSubConnection.setNodeMessagePropagation(true);
            ((NodeSelectionPubSubCommands) statefulRedisClusterPubSubConnection.sync().upstream().commands()).psubscribe(new String[]{this.pubSubPattern});
        }
    }

    protected synchronized void doClose() {
        log.info("Unsubscribing from pub/sub pattern {}", this.pubSubPattern);
        if (this.pubSubConnection instanceof StatefulRedisClusterPubSubConnection) {
            StatefulRedisClusterPubSubConnection statefulRedisClusterPubSubConnection = this.pubSubConnection;
            ((NodeSelectionPubSubCommands) statefulRedisClusterPubSubConnection.sync().upstream().commands()).punsubscribe(new String[]{this.pubSubPattern});
            statefulRedisClusterPubSubConnection.removeListener(this);
        } else {
            this.pubSubConnection.sync().punsubscribe(new String[]{this.pubSubPattern});
            this.pubSubConnection.removeListener(this);
        }
        this.pubSubConnection.close();
        this.queue.clear();
    }

    public void message(String str, String str2) {
        notification(str);
    }

    public void message(String str, String str2, String str3) {
        message(str2, str3);
    }

    public void subscribed(String str, long j) {
    }

    public void psubscribed(String str, long j) {
    }

    public void unsubscribed(String str, long j) {
    }

    public void punsubscribed(String str, long j) {
    }

    public void message(RedisClusterNode redisClusterNode, String str, String str2) {
        notification(str);
    }

    public void message(RedisClusterNode redisClusterNode, String str, String str2, String str3) {
        notification(str2);
    }

    public void subscribed(RedisClusterNode redisClusterNode, String str, long j) {
    }

    public void psubscribed(RedisClusterNode redisClusterNode, String str, long j) {
    }

    public void unsubscribed(RedisClusterNode redisClusterNode, String str, long j) {
    }

    public void punsubscribed(RedisClusterNode redisClusterNode, String str, long j) {
    }

    public void stop() {
        this.stopped = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: doRead, reason: merged with bridge method [inline-methods] */
    public synchronized String m19doRead() throws Exception {
        String poll;
        do {
            poll = this.queue.poll(this.queuePollingTimeout, TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (!this.stopped);
        return poll;
    }

    private String key(String str) {
        return str.substring(str.indexOf(KeyMaker.DEFAULT_SEPARATOR) + 1);
    }

    private void notification(String str) {
        if (str == null) {
            return;
        }
        this.queue.offer(key(str));
    }
}
