package dev.responsive.kafka.api;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import dev.responsive.db.CassandraClient;
import dev.responsive.kafka.config.ResponsiveDriverConfig;
import dev.responsive.utils.SessionUtil;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;

/* loaded from: input_file:dev/responsive/kafka/api/ResponsiveDriver.class */
public class ResponsiveDriver implements StreamsStoreDriver, Closeable {
    private static final Map<String, String> CHANGELOG_CONFIG = Map.of("cleanup.policy", "delete");
    private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(2);
    private final CqlSession session;
    private final CassandraClient client;
    private final Admin admin;

    public static ResponsiveDriver connect(Map<String, Object> map) {
        Properties properties = new Properties();
        properties.putAll(map);
        return connect(properties);
    }

    public static ResponsiveDriver connect(Properties properties) {
        ResponsiveDriverConfig responsiveDriverConfig = new ResponsiveDriverConfig(properties);
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(responsiveDriverConfig.getString(ResponsiveDriverConfig.STORAGE_HOSTNAME_CONFIG), responsiveDriverConfig.getInt(ResponsiveDriverConfig.STORAGE_PORT_CONFIG).intValue());
        String string = responsiveDriverConfig.getString(ResponsiveDriverConfig.STORAGE_DATACENTER_CONFIG);
        String string2 = responsiveDriverConfig.getString(ResponsiveDriverConfig.CLIENT_ID_CONFIG);
        Password password = responsiveDriverConfig.getPassword(ResponsiveDriverConfig.CLIENT_SECRET_CONFIG);
        return new ResponsiveDriver(SessionUtil.connect(createUnresolved, string, responsiveDriverConfig.getString(ResponsiveDriverConfig.TENANT_ID_CONFIG), string2, password == null ? null : password.value()), Admin.create(properties));
    }

    @VisibleForTesting
    public ResponsiveDriver(CqlSession cqlSession, Admin admin) {
        this.session = cqlSession;
        this.client = new CassandraClient(cqlSession);
        this.admin = admin;
    }

    @Override // dev.responsive.kafka.api.StreamsStoreDriver
    public KeyValueBytesStoreSupplier kv(String str) {
        return new ResponsiveKeyValueBytesStoreSupplier(this.client, str, this.executor, this.admin, false);
    }

    @Override // dev.responsive.kafka.api.StreamsStoreDriver
    public KeyValueBytesStoreSupplier timestampedKv(String str) {
        return new ResponsiveKeyValueBytesStoreSupplier(this.client, str, this.executor, this.admin, true);
    }

    @Override // dev.responsive.kafka.api.StreamsStoreDriver
    public WindowBytesStoreSupplier windowed(String str, long j, long j2, boolean z) {
        return new ResponsiveWindowedStoreSupplier(this.client, str, this.executor, this.admin, j, j2, z);
    }

    @Override // dev.responsive.kafka.api.StreamsStoreDriver
    public KeyValueBytesStoreSupplier globalKv(String str) {
        return new ResponsiveGlobalKeyValueBytesStoreSupplier(this.client, str, this.executor);
    }

    @Override // dev.responsive.kafka.api.StreamsStoreDriver
    public <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized(String str) {
        return Materialized.as(timestampedKv(str)).withLoggingEnabled(CHANGELOG_CONFIG);
    }

    @Override // dev.responsive.kafka.api.StreamsStoreDriver
    public <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> windowMaterialized(String str, long j, long j2, boolean z) {
        return Materialized.as(windowed(str, j, j2, z)).withLoggingEnabled(CHANGELOG_CONFIG);
    }

    @Override // dev.responsive.kafka.api.StreamsStoreDriver
    public <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> globalMaterialized(String str) {
        return Materialized.as(globalKv(str)).withCachingDisabled();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.session.close();
        this.executor.shutdown();
    }
}
