package org.springframework.batch.item.redis;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.BaseRedisCommands;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.Function;
import org.springframework.batch.item.redis.support.ClientUtils;
import org.springframework.batch.item.redis.support.RedisClientBuilder;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:org/springframework/batch/item/redis/StreamItemReader.class */
public class StreamItemReader extends AbstractItemCountingItemStreamItemReader<StreamMessage<String, String>> {
    private final AbstractRedisClient client;
    private final Function<StatefulConnection<String, String>, BaseRedisCommands<String, String>> sync;
    private final XReadArgs args;
    private XReadArgs.StreamOffset<String> offset;
    private Iterator<StreamMessage<String, String>> messages = Collections.emptyIterator();
    private StatefulConnection<String, String> connection;

    /* loaded from: input_file:org/springframework/batch/item/redis/StreamItemReader$StreamItemReaderBuilder.class */
    public static class StreamItemReaderBuilder extends RedisClientBuilder<StreamItemReaderBuilder> {
        private XReadArgs args = new XReadArgs();
        private XReadArgs.StreamOffset<String> offset;

        public StreamItemReader build() {
            return new StreamItemReader(this.client, this.args, this.offset);
        }

        public StreamItemReaderBuilder args(XReadArgs xReadArgs) {
            this.args = xReadArgs;
            return this;
        }

        public StreamItemReaderBuilder offset(XReadArgs.StreamOffset<String> streamOffset) {
            this.offset = streamOffset;
            return this;
        }
    }

    public StreamItemReader(AbstractRedisClient abstractRedisClient, XReadArgs xReadArgs, XReadArgs.StreamOffset<String> streamOffset) {
        setName(ClassUtils.getShortName(getClass()));
        Assert.notNull(abstractRedisClient, "A Redis client is required.");
        Assert.notNull(xReadArgs, "XREAD args are required.");
        Assert.notNull(streamOffset, "Stream offset is required.");
        this.client = abstractRedisClient;
        this.sync = ClientUtils.sync(abstractRedisClient);
        this.args = xReadArgs;
        this.offset = streamOffset;
    }

    protected synchronized void doOpen() {
        this.connection = ClientUtils.connection(this.client);
    }

    protected void doClose() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: doRead, reason: merged with bridge method [inline-methods] */
    public StreamMessage<String, String> m3doRead() throws Exception {
        while (!this.messages.hasNext()) {
            this.messages = this.sync.apply(this.connection).xread(this.args, new XReadArgs.StreamOffset[]{this.offset}).iterator();
        }
        StreamMessage<String, String> next = this.messages.next();
        this.offset = XReadArgs.StreamOffset.from((String) next.getStream(), next.getId());
        return next;
    }

    public static StreamItemReaderBuilder builder() {
        return new StreamItemReaderBuilder();
    }
}
