package org.springframework.batch.item.redis.support;

import io.micrometer.core.instrument.Tag;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.batch.item.redis.support.KeyValue;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:org/springframework/batch/item/redis/support/AbstractKeyValueItemReader.class */
public abstract class AbstractKeyValueItemReader<T extends KeyValue<?>> extends AbstractItemCountingItemStreamItemReader<T> implements BoundedItemReader<T>, FlushableItemStreamReader<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractKeyValueItemReader.class);
    private final ItemReader<String> keyReader;
    private final ValueReader<T> valueReader;
    private final BlockingQueue<T> queue;
    private final Transfer<String, String> transfer;
    private Tag nameTag;
    private long queuePollingTimeout;
    private TransferExecution<String, String> transferExecution;
    private CompletableFuture<Void> transferFuture;

    /* loaded from: input_file:org/springframework/batch/item/redis/support/AbstractKeyValueItemReader$ValueEnqueuer.class */
    private class ValueEnqueuer implements ItemStreamWriter<String> {
        private ValueEnqueuer() {
        }

        public void open(ExecutionContext executionContext) {
            AbstractKeyValueItemReader.this.valueReader.open(executionContext);
        }

        public void close() throws ItemStreamException {
            AbstractKeyValueItemReader.this.valueReader.close();
        }

        public void update(ExecutionContext executionContext) throws ItemStreamException {
            AbstractKeyValueItemReader.this.valueReader.update(executionContext);
        }

        public void write(List<? extends String> list) throws Exception {
            for (KeyValue keyValue : AbstractKeyValueItemReader.this.valueReader.read(list)) {
                AbstractKeyValueItemReader.this.queue.removeIf(keyValue2 -> {
                    return keyValue2.getKey().equals(keyValue.getKey());
                });
                AbstractKeyValueItemReader.this.queue.put(keyValue);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractKeyValueItemReader(ItemReader<String> itemReader, ValueReader<T> valueReader, TransferOptions transferOptions, QueueOptions queueOptions) {
        setName(ClassUtils.getShortName(getClass()));
        Assert.notNull(itemReader, "A key reader is required.");
        Assert.notNull(valueReader, "A value reader is required.");
        Assert.notNull(transferOptions, "Transfer options are required.");
        Assert.notNull(queueOptions, "Queue options are required.");
        this.keyReader = itemReader;
        this.valueReader = valueReader;
        this.transfer = Transfer.builder().name("value-reader").reader(itemReader).writer(new ValueEnqueuer()).options(transferOptions).build();
        this.queue = new LinkedBlockingDeque(queueOptions.getCapacity());
        this.queuePollingTimeout = queueOptions.getPollingTimeout().toMillis();
    }

    public void setName(String str) {
        this.nameTag = Tag.of("name", str);
        super.setName(str);
    }

    protected void doOpen() {
        MetricsUtils.createGaugeCollectionSize("reader.queue.size", this.queue, this.nameTag);
        this.transferExecution = new TransferExecution<>(this.transfer);
        this.transferFuture = this.transferExecution.start();
    }

    protected void doClose() throws ItemStreamException, InterruptedException, ExecutionException {
        if (!this.queue.isEmpty()) {
            log.warn("Closing {} - {} items still in queue", ClassUtils.getShortName(getClass()), Integer.valueOf(this.queue.size()));
        }
        log.info("Stopping key transfer");
        this.transferExecution.stop();
        log.info("Waiting for key transfer to finish");
        this.transferFuture.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: doRead, reason: merged with bridge method [inline-methods] */
    public T m5doRead() throws Exception {
        T poll;
        do {
            poll = this.queue.poll(this.queuePollingTimeout, TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (!this.transferExecution.isTerminated());
        return poll;
    }

    @Override // org.springframework.batch.item.redis.support.FlushableItemStreamReader
    public void flush() {
        this.transferExecution.flush();
    }

    @Override // org.springframework.batch.item.redis.support.BoundedItemReader
    public int available() {
        if (this.keyReader instanceof BoundedItemReader) {
            return ((BoundedItemReader) this.keyReader).available();
        }
        return 0;
    }

    public ItemReader<String> getKeyReader() {
        return this.keyReader;
    }
}
