package dev.responsive.kafka.internal.metrics;

import dev.responsive.kafka.internal.clients.OffsetRecorder;
import dev.responsive.kafka.internal.clients.ResponsiveConsumer;
import java.io.Closeable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/metrics/MetricPublishingCommitListener.class */
public class MetricPublishingCommitListener implements ResponsiveConsumer.Listener, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(MetricPublishingCommitListener.class);
    private final ResponsiveMetrics metrics;
    private final String threadId;
    private final Map<TopicPartition, CommittedOffset> offsets = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/kafka/internal/metrics/MetricPublishingCommitListener$CommittedOffset.class */
    public static class CommittedOffset {
        private final TopicPartition topicPartition;
        private final Long offset;

        private CommittedOffset(TopicPartition topicPartition, Long l) {
            this.topicPartition = topicPartition;
            this.offset = l;
        }

        public OptionalLong getOffset() {
            return this.offset == null ? OptionalLong.empty() : OptionalLong.of(this.offset.longValue());
        }
    }

    public MetricPublishingCommitListener(ResponsiveMetrics responsiveMetrics, String str, OffsetRecorder offsetRecorder) {
        this.metrics = (ResponsiveMetrics) Objects.requireNonNull(responsiveMetrics);
        this.threadId = (String) Objects.requireNonNull(str);
        offsetRecorder.addCommitCallback(this::commitCallback);
    }

    private MetricName committedOffsetMetric(TopicPartition topicPartition) {
        return this.metrics.metricName(TopicMetrics.COMMITTED_OFFSET, TopicMetrics.COMMITTED_OFFSET_DESCRIPTION, this.metrics.topicLevelMetric(this.threadId, topicPartition));
    }

    private void commitCallback(Map<OffsetRecorder.RecordingKey, Long> map, Map<TopicPartition, Long> map2) {
        for (Map.Entry<OffsetRecorder.RecordingKey, Long> entry : map.entrySet()) {
            this.offsets.computeIfPresent(entry.getKey().getPartition(), (topicPartition, committedOffset) -> {
                LOG.debug("record committed offset {} {}: {}", new Object[]{this.threadId, topicPartition, entry.getValue()});
                return new CommittedOffset(topicPartition, (Long) entry.getValue());
            });
        }
    }

    @Override // dev.responsive.kafka.internal.clients.ResponsiveConsumer.Listener
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        Logger logger = LOG;
        Stream<TopicPartition> stream = collection.stream();
        Map<TopicPartition, CommittedOffset> map = this.offsets;
        Objects.requireNonNull(map);
        logger.info("Remove committed offset metrics entry for {}", stream.filter((v1) -> {
            return r3.containsKey(v1);
        }).map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(",")));
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            this.offsets.computeIfPresent(it.next(), (topicPartition, committedOffset) -> {
                this.metrics.removeMetric(committedOffsetMetric(topicPartition));
                return null;
            });
        }
    }

    @Override // dev.responsive.kafka.internal.clients.ResponsiveConsumer.Listener
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        LOG.info("Add committed offsets metrics entry for {}", collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(",")));
        for (TopicPartition topicPartition : collection) {
            this.offsets.computeIfAbsent(topicPartition, topicPartition2 -> {
                LOG.debug("add committed offset metric for {} {}", this.threadId, topicPartition2);
                this.metrics.addMetric(committedOffsetMetric(topicPartition2), (metricConfig, j) -> {
                    return getCommittedOffset(topicPartition2);
                });
                return new CommittedOffset(topicPartition, null);
            });
        }
    }

    private Long getCommittedOffset(TopicPartition topicPartition) {
        CommittedOffset committedOffset = this.offsets.get(topicPartition);
        if (committedOffset == null || committedOffset.getOffset().isEmpty()) {
            return -1L;
        }
        return Long.valueOf(committedOffset.getOffset().getAsLong());
    }

    @Override // dev.responsive.kafka.internal.clients.ResponsiveConsumer.Listener
    public void onPartitionsLost(Collection<TopicPartition> collection) {
        onPartitionsRevoked(collection);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        for (TopicPartition topicPartition : this.offsets.keySet()) {
            if (this.offsets.containsKey(topicPartition)) {
                LOG.info("Clean up committed offset metric {} {}", this.threadId, topicPartition);
                this.metrics.removeMetric(committedOffsetMetric(topicPartition));
            }
        }
        this.offsets.clear();
    }
}
