package dev.responsive.kafka.internal.db.partitioning;

import dev.responsive.kafka.api.stores.ResponsiveWindowParams;
import dev.responsive.kafka.internal.utils.StoreUtil;
import dev.responsive.kafka.internal.utils.WindowedKey;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/partitioning/SegmentPartitioner.class */
public class SegmentPartitioner implements TablePartitioner<WindowedKey, SegmentPartition> {
    private static final Logger LOG = LoggerFactory.getLogger(SegmentPartitioner.class);
    public static final long METADATA_SEGMENT_ID = -1;
    public static final long UNINITIALIZED_STREAM_TIME = -1;
    private final long retentionPeriodMs;
    private final long segmentIntervalMs;
    private final long windowSizeMs;

    /* loaded from: input_file:dev/responsive/kafka/internal/db/partitioning/SegmentPartitioner$SegmentPartition.class */
    public static class SegmentPartition {
        public final int tablePartition;
        public final long segmentId;

        public SegmentPartition(int i, long j) {
            this.tablePartition = i;
            this.segmentId = j;
        }

        public String toString() {
            return "SegmentPartition{tablePartition=" + this.tablePartition + ", segmentId=" + this.segmentId + "}";
        }
    }

    /* loaded from: input_file:dev/responsive/kafka/internal/db/partitioning/SegmentPartitioner$SegmentRoll.class */
    public static class SegmentRoll {
        public final long[] segmentsToExpire;
        public final long[] segmentsToCreate;

        public SegmentRoll(long[] jArr, long[] jArr2) {
            this.segmentsToExpire = jArr;
            this.segmentsToCreate = jArr2;
        }

        public String toString() {
            int length = this.segmentsToExpire.length;
            String format = length == 0 ? "[]" : String.format("[%d-%d]", Long.valueOf(this.segmentsToExpire[0]), Long.valueOf(this.segmentsToExpire[length - 1]));
            int length2 = this.segmentsToCreate.length;
            return String.format("SegmentRoll: expired segment(s)=%s, new segments(s)=%s", format, length2 == 0 ? "[]" : String.format("[%d-%d]", Long.valueOf(this.segmentsToCreate[0]), Long.valueOf(this.segmentsToCreate[length2 - 1])));
        }
    }

    public static SegmentPartitioner create(ResponsiveWindowParams responsiveWindowParams) {
        return new SegmentPartitioner(responsiveWindowParams.retentionPeriod(), StoreUtil.computeSegmentInterval(responsiveWindowParams.retentionPeriod(), responsiveWindowParams.numSegments()), responsiveWindowParams.windowSize());
    }

    public SegmentPartitioner(long j, long j2, long j3) {
        this.retentionPeriodMs = j;
        this.segmentIntervalMs = j2;
        this.windowSizeMs = j3;
        if (j <= 0 || j2 <= 0 || j3 <= 0) {
            LOG.error("Segment values should all be positive, got retentionPeriod={}ms, segmentInterval={}ms, and windowSize={}ms", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3)});
            throw new IllegalStateException("Segment partitioner received a negative or zero value");
        }
        LOG.info("Created segment partitioner with retentionPeriod={}ms, segmentInterval={}ms, and windowSize={}ms", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3)});
    }

    @Override // dev.responsive.kafka.internal.db.partitioning.TablePartitioner
    public SegmentPartition tablePartition(int i, WindowedKey windowedKey) {
        return new SegmentPartition(i, segmentId(windowedKey.windowStartMs));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // dev.responsive.kafka.internal.db.partitioning.TablePartitioner
    public SegmentPartition metadataTablePartition(int i) {
        return new SegmentPartition(i, -1L);
    }

    public List<SegmentPartition> activeSegments(int i, long j) {
        return j == -1 ? Collections.emptyList() : range(i, minValidTs(j), j);
    }

    public List<SegmentPartition> range(int i, long j, long j2) {
        return (List) LongStream.range(segmentId(j), segmentId(j2) + 1).mapToObj(j3 -> {
            return new SegmentPartition(i, j3);
        }).collect(Collectors.toList());
    }

    public List<SegmentPartition> reverseRange(int i, long j, long j2) {
        return (List) LongStream.range(segmentId(j), segmentId(j2) + 1).boxed().sorted(Collections.reverseOrder()).map(l -> {
            return new SegmentPartition(i, l.longValue());
        }).collect(Collectors.toList());
    }

    public SegmentRoll rolledSegments(String str, long j, long j2) {
        long segmentId = segmentId(j);
        long segmentId2 = segmentId(j2);
        long segmentId3 = segmentId(minValidTs(j));
        long segmentId4 = segmentId(minValidTs(j2));
        if (j == -1) {
            long[] array = LongStream.range(segmentId4, segmentId2 + 1).toArray();
            LOG.info("Initializing stream-time for table {} to {}ms and creating segments: [{}-{}]", new Object[]{str, Long.valueOf(j2), Long.valueOf(segmentId4), Long.valueOf(segmentId2)});
            return new SegmentRoll(new long[0], array);
        }
        long[] array2 = LongStream.range(segmentId3, segmentId4).toArray();
        long[] array3 = LongStream.range(segmentId + 1, segmentId2 + 1).toArray();
        if (array3.length > 0) {
            LOG.info("Advancing stream-time for table {} from {}ms to {}ms and rolling segments with expiredSegments: [{}-{}] and newSegments: [{}-{}]", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(segmentId3), Long.valueOf(segmentId4), Long.valueOf(segmentId + 1), Long.valueOf(segmentId2)});
        }
        return new SegmentRoll(array2, array3);
    }

    private long segmentId(long j) {
        return Long.max(0L, j / this.segmentIntervalMs);
    }

    private long minValidTs(long j) {
        return (j - this.retentionPeriodMs) + 1;
    }
}
