package zipkin.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.utils.Bytes;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.spanstore.guava.GuavaSpanConsumer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:zipkin/cassandra/CassandraSpanConsumer.class */
public final class CassandraSpanConsumer implements GuavaSpanConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanConsumer.class);
    private static final long WRITTEN_NAMES_TTL = Long.getLong("zipkin.store.cassandra.internal.writtenNamesTtl", 3600000).longValue();
    private static final Function<Object, Void> TO_VOID = Functions.constant((Object) null);
    private static final Random RAND = new Random();
    private final Session session;
    private final TimestampCodec timestampCodec;
    private final int bucketCount;
    private final int spanTtl;
    private final int indexTtl;
    private final PreparedStatement insertSpan;
    private final PreparedStatement insertServiceName;
    private final PreparedStatement insertSpanName;
    private final PreparedStatement insertTraceIdByServiceName;
    private final PreparedStatement insertTraceIdBySpanName;
    private final PreparedStatement insertTraceIdByAnnotation;
    private final PreparedStatement insertTraceIdBySpanDuration;
    private final Map<String, String> metadata;
    private final ThreadLocal<Set<String>> writtenNames = new ThreadLocal<Set<String>>() { // from class: zipkin.cassandra.CassandraSpanConsumer.1
        private long cacheInterval = toCacheInterval(System.currentTimeMillis());

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public Set<String> initialValue() {
            return new HashSet();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public Set<String> get() {
            long cacheInterval = toCacheInterval(System.currentTimeMillis());
            if (this.cacheInterval != cacheInterval) {
                this.cacheInterval = cacheInterval;
                set(new HashSet());
            }
            return (Set) super.get();
        }

        private long toCacheInterval(long j) {
            return j / CassandraSpanConsumer.WRITTEN_NAMES_TTL;
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public CassandraSpanConsumer(Session session, Map<String, String> map, int i, int i2, int i3) {
        this.session = session;
        this.timestampCodec = new TimestampCodec(session);
        this.bucketCount = i;
        this.spanTtl = i2;
        this.indexTtl = i3;
        this.metadata = map;
        this.insertSpan = session.prepare(QueryBuilder.insertInto("traces").value("trace_id", QueryBuilder.bindMarker("trace_id")).value("ts", QueryBuilder.bindMarker("ts")).value("span_name", QueryBuilder.bindMarker("span_name")).value("span", QueryBuilder.bindMarker("span")).using(QueryBuilder.ttl(QueryBuilder.bindMarker("ttl_"))));
        this.insertServiceName = session.prepare(QueryBuilder.insertInto("service_names").value("service_name", QueryBuilder.bindMarker("service_name")).using(QueryBuilder.ttl(QueryBuilder.bindMarker("ttl_"))));
        this.insertSpanName = session.prepare(QueryBuilder.insertInto("span_names").value("service_name", QueryBuilder.bindMarker("service_name")).value("bucket", QueryBuilder.bindMarker("bucket")).value("span_name", QueryBuilder.bindMarker("span_name")).using(QueryBuilder.ttl(QueryBuilder.bindMarker("ttl_"))));
        this.insertTraceIdByServiceName = session.prepare(QueryBuilder.insertInto("service_name_index").value("service_name", QueryBuilder.bindMarker("service_name")).value("bucket", QueryBuilder.bindMarker("bucket")).value("ts", QueryBuilder.bindMarker("ts")).value("trace_id", QueryBuilder.bindMarker("trace_id")).using(QueryBuilder.ttl(QueryBuilder.bindMarker("ttl_"))));
        this.insertTraceIdBySpanName = session.prepare(QueryBuilder.insertInto("service_span_name_index").value("service_span_name", QueryBuilder.bindMarker("service_span_name")).value("ts", QueryBuilder.bindMarker("ts")).value("trace_id", QueryBuilder.bindMarker("trace_id")).using(QueryBuilder.ttl(QueryBuilder.bindMarker("ttl_"))));
        this.insertTraceIdByAnnotation = session.prepare(QueryBuilder.insertInto("annotations_index").value("annotation", QueryBuilder.bindMarker("annotation")).value("bucket", QueryBuilder.bindMarker("bucket")).value("ts", QueryBuilder.bindMarker("ts")).value("trace_id", QueryBuilder.bindMarker("trace_id")).using(QueryBuilder.ttl(QueryBuilder.bindMarker("ttl_"))));
        this.insertTraceIdBySpanDuration = session.prepare(QueryBuilder.insertInto("span_duration_index").value("service_name", QueryBuilder.bindMarker("service_name")).value("span_name", QueryBuilder.bindMarker("span_name")).value("bucket", QueryBuilder.bindMarker("bucket")).value("duration", QueryBuilder.bindMarker("duration")).value("ts", QueryBuilder.bindMarker("ts")).value("trace_id", QueryBuilder.bindMarker("trace_id")).using(QueryBuilder.ttl(QueryBuilder.bindMarker("ttl_"))));
    }

    public ListenableFuture<Void> accept(List<Span> list) {
        LinkedList linkedList = new LinkedList();
        Iterator<Span> it = list.iterator();
        while (it.hasNext()) {
            Span apply = ApplyTimestampAndDuration.apply(it.next());
            linkedList.add(storeSpan(apply.traceId, apply.timestamp != null ? apply.timestamp.longValue() : 0L, String.format("%d_%d_%d", Long.valueOf(apply.id), Integer.valueOf(apply.annotations.hashCode()), Integer.valueOf(apply.binaryAnnotations.hashCode())), ByteBuffer.wrap(Codec.THRIFT.writeSpan(apply)), this.spanTtl));
            for (String str : apply.serviceNames()) {
                linkedList.add(storeServiceName(str, this.indexTtl));
                if (!apply.name.isEmpty()) {
                    linkedList.add(storeSpanName(str, apply.name, this.indexTtl));
                }
                if (apply.timestamp != null) {
                    linkedList.add(storeTraceIdByServiceName(str, apply.timestamp.longValue(), apply.traceId, this.indexTtl));
                    if (!apply.name.isEmpty()) {
                        linkedList.add(storeTraceIdBySpanName(str, apply.name, apply.timestamp.longValue(), apply.traceId, this.indexTtl));
                    }
                    if (apply.duration != null) {
                        linkedList.add(storeTraceIdByDuration(str, apply.name, apply.timestamp.longValue(), apply.duration.longValue(), apply.traceId, this.indexTtl));
                        if (!apply.name.isEmpty()) {
                            storeTraceIdByDuration(str, "", apply.timestamp.longValue(), apply.duration.longValue(), apply.traceId, this.indexTtl);
                        }
                    }
                }
            }
            if (apply.timestamp != null) {
                Iterator<String> it2 = CassandraUtil.annotationKeys(apply).iterator();
                while (it2.hasNext()) {
                    linkedList.add(storeTraceIdByAnnotation(it2.next(), apply.timestamp.longValue(), apply.traceId, this.indexTtl));
                }
            }
        }
        return Futures.transform(Futures.allAsList(linkedList), TO_VOID);
    }

    ListenableFuture<?> storeSpan(long j, long j2, String str, ByteBuffer byteBuffer, int i) {
        Preconditions.checkNotNull(str);
        Preconditions.checkArgument(!str.isEmpty());
        if (0 == j2) {
            try {
                if (this.metadata.get("traces.compaction.class").contains("DateTieredCompactionStrategy")) {
                    LOG.warn("Span {} in trace {} had no timestamp. If this happens a lot consider switching back to SizeTieredCompactionStrategy for {}.traces", new Object[]{str, Long.valueOf(j), this.session.getLoggedKeyspace()});
                }
            } catch (RuntimeException e) {
                LOG.error("failed " + debugInsertSpan(j, j2, str, byteBuffer, i), e);
                return Futures.immediateFailedFuture(e);
            }
        }
        BoundStatement boundStatement = this.insertSpan.bind().setLong("trace_id", j).setBytesUnsafe("ts", this.timestampCodec.serialize(j2)).setString("span_name", str).setBytes("span", byteBuffer).setInt("ttl_", i);
        if (LOG.isDebugEnabled()) {
            LOG.debug(debugInsertSpan(j, j2, str, byteBuffer, i));
        }
        return this.session.executeAsync(boundStatement);
    }

    private String debugInsertSpan(long j, long j2, String str, ByteBuffer byteBuffer, int i) {
        return this.insertSpan.getQueryString().replace(":trace_id", String.valueOf(j)).replace(":ts", String.valueOf(j2)).replace(":span_name", str).replace(":span", Bytes.toHexString(byteBuffer)).replace(":ttl_", String.valueOf(i));
    }

    ListenableFuture<?> storeServiceName(String str, int i) {
        Preconditions.checkNotNull(str);
        Preconditions.checkArgument(!str.isEmpty());
        if (!this.writtenNames.get().add(str)) {
            return Futures.immediateFuture((Object) null);
        }
        try {
            BoundStatement boundStatement = this.insertServiceName.bind().setString("service_name", str).setInt("ttl_", i);
            if (LOG.isDebugEnabled()) {
                LOG.debug(debugInsertServiceName(str, i));
            }
            return this.session.executeAsync(boundStatement);
        } catch (RuntimeException e) {
            LOG.error("failed " + debugInsertServiceName(str, i), e);
            this.writtenNames.get().remove(str);
            throw e;
        }
    }

    private String debugInsertServiceName(String str, int i) {
        return this.insertServiceName.getQueryString().replace(":service_name", str).replace(":ttl_", String.valueOf(i));
    }

    ListenableFuture<?> storeSpanName(String str, String str2, int i) {
        Preconditions.checkNotNull(str);
        Preconditions.checkArgument(!str.isEmpty());
        Preconditions.checkNotNull(str2);
        Preconditions.checkArgument(!str2.isEmpty());
        if (!this.writtenNames.get().add(str + "––" + str2)) {
            return Futures.immediateFuture((Object) null);
        }
        try {
            BoundStatement boundStatement = this.insertSpanName.bind().setString("service_name", str).setInt("bucket", 0).setString("span_name", str2).setInt("ttl_", i);
            if (LOG.isDebugEnabled()) {
                LOG.debug(debugInsertSpanName(0, str, str2, i));
            }
            return this.session.executeAsync(boundStatement);
        } catch (RuntimeException e) {
            LOG.error("failed " + debugInsertSpanName(0, str, str2, i), e);
            this.writtenNames.get().remove(str + "––" + str2);
            return Futures.immediateFailedFuture(e);
        }
    }

    private String debugInsertSpanName(int i, String str, String str2, int i2) {
        return this.insertSpanName.getQueryString().replace(":bucket", String.valueOf(i)).replace(":service_name", str).replace(":span_name", str2).replace(":ttl_", String.valueOf(i2));
    }

    ListenableFuture<?> storeTraceIdByServiceName(String str, long j, long j2, int i) {
        Preconditions.checkNotNull(str);
        Preconditions.checkArgument(!str.isEmpty());
        int nextInt = RAND.nextInt(this.bucketCount);
        try {
            BoundStatement boundStatement = this.insertTraceIdByServiceName.bind().setInt("bucket", nextInt).setString("service_name", str).setBytesUnsafe("ts", this.timestampCodec.serialize(j)).setLong("trace_id", j2).setInt("ttl_", i);
            if (LOG.isDebugEnabled()) {
                LOG.debug(debugInsertTraceIdByServiceName(nextInt, str, j, j2, i));
            }
            return this.session.executeAsync(boundStatement);
        } catch (RuntimeException e) {
            LOG.error("failed " + debugInsertTraceIdByServiceName(nextInt, str, j, j2, i), e);
            return Futures.immediateFailedFuture(e);
        }
    }

    private String debugInsertTraceIdByServiceName(int i, String str, long j, long j2, int i2) {
        return this.insertTraceIdByServiceName.getQueryString().replace(":bucket", String.valueOf(i)).replace(":service_name", str).replace(":ts", CassandraUtil.iso8601(j)).replace(":trace_id", String.valueOf(j2)).replace(":ttl_", String.valueOf(i2));
    }

    ListenableFuture<?> storeTraceIdBySpanName(String str, String str2, long j, long j2, int i) {
        Preconditions.checkNotNull(str);
        Preconditions.checkArgument(!str.isEmpty());
        Preconditions.checkNotNull(str2);
        Preconditions.checkArgument(!str2.isEmpty());
        try {
            String str3 = str + "." + str2;
            BoundStatement boundStatement = this.insertTraceIdBySpanName.bind().setString("service_span_name", str3).setBytesUnsafe("ts", this.timestampCodec.serialize(j)).setLong("trace_id", j2).setInt("ttl_", i);
            if (LOG.isDebugEnabled()) {
                LOG.debug(debugInsertTraceIdBySpanName(str3, j, j2, i));
            }
            return this.session.executeAsync(boundStatement);
        } catch (RuntimeException e) {
            LOG.error("failed " + debugInsertTraceIdBySpanName(str, j, j2, i), e);
            return Futures.immediateFailedFuture(e);
        }
    }

    private String debugInsertTraceIdBySpanName(String str, long j, long j2, int i) {
        return this.insertTraceIdBySpanName.getQueryString().replace(":service_span_name", str).replace(":ts", String.valueOf(j)).replace(":trace_id", String.valueOf(j2)).replace(":ttl_", String.valueOf(i));
    }

    ListenableFuture<?> storeTraceIdByAnnotation(String str, long j, long j2, int i) {
        int nextInt = RAND.nextInt(this.bucketCount);
        try {
            BoundStatement boundStatement = this.insertTraceIdByAnnotation.bind().setInt("bucket", nextInt).setBytes("annotation", CassandraUtil.toByteBuffer(str)).setBytesUnsafe("ts", this.timestampCodec.serialize(j)).setLong("trace_id", j2).setInt("ttl_", i);
            if (LOG.isDebugEnabled()) {
                LOG.debug(debugInsertTraceIdByAnnotation(nextInt, str, j, j2, i));
            }
            return this.session.executeAsync(boundStatement);
        } catch (RuntimeException | CharacterCodingException e) {
            LOG.error("failed " + debugInsertTraceIdByAnnotation(nextInt, str, j, j2, i), e);
            return Futures.immediateFailedFuture(e);
        }
    }

    private String debugInsertTraceIdByAnnotation(int i, String str, long j, long j2, int i2) {
        return this.insertTraceIdByAnnotation.getQueryString().replace(":bucket", String.valueOf(i)).replace(":annotation", str).replace(":ts", CassandraUtil.iso8601(j)).replace(":trace_id", String.valueOf(j2)).replace(":ttl_", String.valueOf(i2));
    }

    ListenableFuture<?> storeTraceIdByDuration(String str, String str2, long j, long j2, long j3, int i) {
        int durationIndexBucket = CassandraUtil.durationIndexBucket(j);
        try {
            BoundStatement boundStatement = this.insertTraceIdBySpanDuration.bind().setInt("bucket", durationIndexBucket).setString("service_name", str).setString("span_name", str2).setBytesUnsafe("ts", this.timestampCodec.serialize(j)).setLong("duration", j2).setLong("trace_id", j3).setInt("ttl_", i);
            if (LOG.isDebugEnabled()) {
                LOG.debug(debugInsertTraceIdBySpanDuration(durationIndexBucket, str, str2, j, j2, j3, i));
            }
            return this.session.executeAsync(boundStatement);
        } catch (RuntimeException e) {
            LOG.error("failed " + debugInsertTraceIdBySpanDuration(durationIndexBucket, str, str2, j, j2, j3, i));
            return Futures.immediateFailedFuture(e);
        }
    }

    private String debugInsertTraceIdBySpanDuration(int i, String str, String str2, long j, long j2, long j3, int i2) {
        return this.insertTraceIdBySpanDuration.getQueryString().replace(":bucket", String.valueOf(i)).replace(":service_name", str).replace(":span_name", str2).replace(":ts", CassandraUtil.iso8601(j)).replace(":duration", String.valueOf(j2)).replace(":trace_id", String.valueOf(j3)).replace(":ttl_", String.valueOf(i2));
    }
}
