package dev.responsive.kafka.internal.db;

import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/db/WriterFactory.class */
public abstract class WriterFactory<K, P> {
    private final Logger log;
    private final String logPrefix;

    /* loaded from: input_file:dev/responsive/kafka/internal/db/WriterFactory$PendingFlush.class */
    public class PendingFlush {
        final Map<P, RemoteWriter<K, P>> activeWriters = new HashMap();

        public PendingFlush() {
        }

        public int numRemoteWriters() {
            return this.activeWriters.size();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public RemoteWriter<K, P> writerForKey(K k) {
            Object tablePartitionForKey = WriterFactory.this.tablePartitionForKey(k);
            Map<P, RemoteWriter<K, P>> map = this.activeWriters;
            WriterFactory writerFactory = WriterFactory.this;
            return (RemoteWriter) map.computeIfAbsent(tablePartitionForKey, writerFactory::createWriter);
        }

        private RemoteWriteResult<P> completeFlush(long j) {
            CompletionStage completedStage = CompletableFuture.completedStage(RemoteWriteResult.success(null));
            Iterator<RemoteWriter<K, P>> it = this.activeWriters.values().iterator();
            while (it.hasNext()) {
                completedStage = completedStage.thenCombine(it.next().flush(), (remoteWriteResult, remoteWriteResult2) -> {
                    return !remoteWriteResult.wasApplied() ? remoteWriteResult : remoteWriteResult2;
                });
            }
            try {
                RemoteWriteResult<P> remoteWriteResult3 = (RemoteWriteResult) completedStage.toCompletableFuture().get();
                if (remoteWriteResult3.wasApplied()) {
                    WriterFactory.this.log.debug("Successfully flushed writes to table partition {}", remoteWriteResult3.tablePartition());
                    return WriterFactory.this.setOffset(j);
                }
                WriterFactory.this.log.info("Failed to flush writes to table partition {}", remoteWriteResult3.tablePartition());
                return remoteWriteResult3;
            } catch (InterruptedException | ExecutionException e) {
                WriterFactory.this.log.error("Unexpected exception while flushing to remote", e);
                throw new RuntimeException(WriterFactory.this.logPrefix + "Failed while flushing to remote", e);
            }
        }
    }

    public WriterFactory(String str) {
        this.logPrefix = str;
        this.log = new LogContext(str).logger(WriterFactory.class);
    }

    public abstract RemoteWriter<K, P> createWriter(P p);

    public abstract String tableName();

    protected abstract P tablePartitionForKey(K k);

    protected abstract RemoteWriteResult<P> setOffset(long j);

    protected abstract long offset();

    public WriterFactory<K, P>.PendingFlush beginNewFlush() {
        return new PendingFlush();
    }

    public RemoteWriteResult<P> commitPendingFlush(WriterFactory<K, P>.PendingFlush pendingFlush, long j) {
        return pendingFlush.completeFlush(j);
    }

    public String failedFlushError(RemoteWriteResult<P> remoteWriteResult, long j) {
        return String.format("Error while writing batch for table partition %s! Batch Offset: %d, Persisted Offset: %d", remoteWriteResult.tablePartition(), Long.valueOf(j), Long.valueOf(offset()));
    }
}
