package io.axway.iron.spi.aws.kinesis;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import io.axway.alf.Arguments;
import io.axway.alf.assertion.Assertion;
import io.axway.alf.log.Logger;
import io.axway.alf.log.LoggerFactory;
import io.axway.iron.spi.StoreNamePrefixManagement;
import io.axway.iron.spi.aws.AwsUtils;
import io.axway.iron.spi.storage.TransactionStore;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/axway/iron/spi/aws/kinesis/AwsKinesisTransactionStore.class */
public class AwsKinesisTransactionStore implements TransactionStore {
    private static final Logger LOG = LoggerFactory.getLogger(AwsKinesisTransactionStore.class);
    private static final long INITIAL_MINIMAL_DURATION_BETWEEN_TWO_GET_SHARD_ITERATOR_REQUESTS = 250;
    private static final String USELESS_PARTITION_KEY = "uselessPartitionKey";
    private final String m_streamName;
    private final Shard m_shard;
    private final AmazonKinesis m_kinesis;
    private String m_shardIterator;
    private final Flowable<TransactionStore.TransactionInput> m_transactionsFlow;
    private BigInteger m_seekTransactionId = null;

    @Nullable
    private Long m_lastGetShardIteratorRequestTime = null;
    private final StoreNamePrefixManagement m_prefixManagement = new StoreNamePrefixManagement();
    private AtomicLong m_durationBetweenRequests = new AtomicLong(INITIAL_MINIMAL_DURATION_BETWEEN_TWO_GET_SHARD_ITERATOR_REQUESTS);

    /* loaded from: input_file:io/axway/iron/spi/aws/kinesis/AwsKinesisTransactionStore$ByteBufferBackedInputStream.class */
    private class ByteBufferBackedInputStream extends InputStream {
        private final ByteBuffer m_byteBuffer;

        private ByteBufferBackedInputStream(ByteBuffer byteBuffer) {
            this.m_byteBuffer = byteBuffer;
        }

        @Override // java.io.InputStream
        public int available() {
            return this.m_byteBuffer.remaining();
        }

        @Override // java.io.InputStream
        public int read() {
            if (this.m_byteBuffer.hasRemaining()) {
                return this.m_byteBuffer.get() & 255;
            }
            return -1;
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) {
            if (!this.m_byteBuffer.hasRemaining()) {
                return -1;
            }
            int min = Math.min(i2, this.m_byteBuffer.remaining());
            this.m_byteBuffer.get(bArr, i, min);
            return min;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AwsKinesisTransactionStore(String str, String str2, String str3, Integer num, String str4, String str5) {
        this.m_kinesis = AwsKinesisUtils.buildKinesisClient(str, str2, str3, num, str4);
        this.m_streamName = str5;
        AwsKinesisUtils.ensureStreamExists(this.m_kinesis, this.m_streamName);
        this.m_shard = getUniqueShard();
        GetShardIteratorRequest withShardId = new GetShardIteratorRequest().withStreamName(this.m_streamName).withShardIteratorType(ShardIteratorType.TRIM_HORIZON).withShardId(this.m_shard.getShardId());
        this.m_shardIterator = ((GetShardIteratorResult) AwsUtils.performAmazonActionWithRetry("describe stream " + this.m_streamName, () -> {
            return this.m_kinesis.getShardIterator(withShardId);
        }, 5, 5)).getShardIterator();
        this.m_transactionsFlow = Flowable.generate(emitter -> {
            emitter.onNext(nextRecords());
        }).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).concatMap((v0) -> {
            return Flowable.fromIterable(v0);
        }).map(record -> {
            final BigInteger bigInteger = new BigInteger(record.getSequenceNumber());
            this.m_seekTransactionId = bigInteger;
            final ByteBufferBackedInputStream byteBufferBackedInputStream = new ByteBufferBackedInputStream(record.getData().asReadOnlyBuffer());
            final String readStoreName = StoreNamePrefixManagement.readStoreName(byteBufferBackedInputStream);
            return new TransactionStore.TransactionInput() { // from class: io.axway.iron.spi.aws.kinesis.AwsKinesisTransactionStore.1
                public String storeName() {
                    return readStoreName;
                }

                public InputStream getInputStream() {
                    return byteBufferBackedInputStream;
                }

                public BigInteger getTransactionId() {
                    return bigInteger;
                }
            };
        });
    }

    public OutputStream createTransactionOutput(String str) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream() { // from class: io.axway.iron.spi.aws.kinesis.AwsKinesisTransactionStore.2
            @Override // java.io.ByteArrayOutputStream, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                super.close();
                ByteBuffer wrap = ByteBuffer.wrap(toByteArray());
                AwsUtils.performAmazonActionWithRetry("describe stream " + AwsKinesisTransactionStore.this.m_streamName, () -> {
                    AwsKinesisTransactionStore.this.m_kinesis.putRecord(new PutRecordRequest().withStreamName(AwsKinesisTransactionStore.this.m_streamName).withData(wrap).withPartitionKey(AwsKinesisTransactionStore.USELESS_PARTITION_KEY));
                    return null;
                }, 5, AwsKinesisUtils.DEFAULT_RETRY_DURATION_IN_MILLIS);
            }
        };
        this.m_prefixManagement.writeNamePrefix(str, byteArrayOutputStream);
        return byteArrayOutputStream;
    }

    public Publisher<TransactionStore.TransactionInput> allTransactions() {
        return this.m_transactionsFlow;
    }

    public void seekTransaction(BigInteger bigInteger) {
        this.m_seekTransactionId = bigInteger;
        if (this.m_seekTransactionId.equals(BigInteger.ZERO)) {
            return;
        }
        GetShardIteratorRequest withStartingSequenceNumber = new GetShardIteratorRequest().withStreamName(this.m_streamName).withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER).withShardId(this.m_shard.getShardId()).withStartingSequenceNumber(this.m_seekTransactionId.toString());
        this.m_shardIterator = ((GetShardIteratorResult) AwsUtils.performAmazonActionWithRetry("describe stream " + this.m_streamName, () -> {
            return this.m_kinesis.getShardIterator(withStartingSequenceNumber);
        }, 5, AwsKinesisUtils.DEFAULT_RETRY_DURATION_IN_MILLIS)).getShardIterator();
    }

    public void close() {
        this.m_kinesis.shutdown();
    }

    private Shard getUniqueShard() {
        DescribeStreamRequest withLimit = new DescribeStreamRequest().withStreamName(this.m_streamName).withLimit(1);
        DescribeStreamResult describeStreamResult = (DescribeStreamResult) AwsUtils.performAmazonActionWithRetry("describe stream " + this.m_streamName, () -> {
            return this.m_kinesis.describeStream(withLimit);
        }, 5, AwsKinesisUtils.DEFAULT_RETRY_DURATION_IN_MILLIS);
        String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
        if (streamStatus == null || !streamStatus.equals("ACTIVE")) {
            throw new AwsKinesisException("Stream does not exist", (Consumer<Arguments>) arguments -> {
                arguments.add("streamName", this.m_streamName);
            });
        }
        List shards = describeStreamResult.getStreamDescription().getShards();
        Assertion.checkState(shards.size() == 1, "Kinesis Stream should contain only one shard", arguments2 -> {
            arguments2.add("streamName", this.m_streamName).add("shardCount", Integer.valueOf(shards.size()));
        });
        return (Shard) shards.get(0);
    }

    @Nullable
    private List<Record> nextRecords() {
        return !waitTheMinimalDurationToExecuteTheNextProvisioningRequest() ? List.of() : getRecords(new GetRecordsRequest().withShardIterator(this.m_shardIterator).withLimit(1));
    }

    private boolean waitTheMinimalDurationToExecuteTheNextProvisioningRequest() {
        if (this.m_lastGetShardIteratorRequestTime != null) {
            long currentTimeMillis = this.m_durationBetweenRequests.get() - (System.currentTimeMillis() - this.m_lastGetShardIteratorRequestTime.longValue());
            if (currentTimeMillis > 0) {
                try {
                    Thread.sleep(currentTimeMillis);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return false;
                }
            }
        }
        this.m_lastGetShardIteratorRequestTime = Long.valueOf(System.currentTimeMillis());
        return true;
    }

    @Nullable
    private List<Record> getRecords(GetRecordsRequest getRecordsRequest) {
        return (List) AwsUtils.tryAmazonAction("", () -> {
            GetRecordsResult records = this.m_kinesis.getRecords(getRecordsRequest);
            this.m_shardIterator = records.getNextShardIterator();
            List records2 = records.getRecords();
            LOG.trace("Get records", arguments -> {
                arguments.add("streamName", this.m_streamName).add("record number", Integer.valueOf(records2.size())).add("millisBehindLatest", records.getMillisBehindLatest());
            });
            return records2;
        }, this.m_durationBetweenRequests).orElse(List.of());
    }
}
