package ru.chermenin.kio.cep.transforms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.TypeCastException;
import kotlin.collections.CollectionsKt;
import kotlin.comparisons.ComparisonsKt;
import kotlin.jvm.internal.Intrinsics;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.joda.time.Instant;
import ru.chermenin.kio.cep.ComplexEvent;
import ru.chermenin.kio.cep.nfa.NFA;
import ru.chermenin.kio.cep.pattern.Pattern;

/* compiled from: LateMatchValueDoFn.kt */
@Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��d\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\u0018��*\b\b��\u0010\u0001*\u00020\u0002*\b\b\u0001\u0010\u0003*\u00020\u00022,\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u0002H\u0001\u0012\u0004\u0012\u0002H\u00030\u0005\u0012\u0016\u0012\u0014\u0012\u0004\u0012\u0002H\u0001\u0012\n\u0012\b\u0012\u0004\u0012\u0002H\u00030\u00060\u00050\u0004B7\u0012\f\u0010\u0007\u001a\b\u0012\u0004\u0012\u00028\u00010\b\u0012\u0006\u0010\t\u001a\u00020\n\u0012\f\u0010\u000b\u001a\b\u0012\u0004\u0012\u00028��0\f\u0012\f\u0010\r\u001a\b\u0012\u0004\u0012\u00028\u00010\f¢\u0006\u0002\u0010\u000eJz\u0010\u001c\u001a\u00020\u001d24\u0010\u001e\u001a00\u001fR,\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u0005\u0012\u0016\u0012\u0014\u0012\u0004\u0012\u00028��\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00010\u00060\u00050\u00042\u0012\b\u0001\u0010\u0018\u001a\f\u0012\b\u0012\u0006\u0012\u0002\b\u00030\u001a0\u00192&\b\u0001\u0010\u000f\u001a \u0012\u001c\u0012\u001a\u0012\u0004\u0012\u00020\n\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u00050\u00050\u0011H\u0007J\u0084\u0001\u0010 \u001a\u00020\u001d24\u0010\u001e\u001a00!R,\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u0005\u0012\u0016\u0012\u0014\u0012\u0004\u0012\u00028��\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00010\u00060\u00050\u00042\u0012\b\u0001\u0010\u0018\u001a\f\u0012\b\u0012\u0006\u0012\u0002\b\u00030\u001a0\u00192&\b\u0001\u0010\u000f\u001a \u0012\u001c\u0012\u001a\u0012\u0004\u0012\u00020\n\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u00050\u00050\u00112\b\b\u0001\u0010\u0014\u001a\u00020\"H\u0007R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��R:\u0010\u000f\u001a&\u0012\"\u0012 \u0012\u001c\u0012\u001a\u0012\u0004\u0012\u00020\n\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u00050\u00050\u00110\u00108\u0006X\u0087\u0004¢\u0006\b\n��\u001a\u0004\b\u0012\u0010\u0013R\u0016\u0010\u0014\u001a\u00020\u00158\u0006X\u0087\u0004¢\u0006\b\n��\u001a\u0004\b\u0016\u0010\u0017R(\u0010\u0018\u001a\u0014\u0012\u000e\u0012\f\u0012\b\u0012\u0006\u0012\u0002\b\u00030\u001a0\u0019\u0018\u00010\u00108\u0006X\u0087\u0004¢\u0006\b\n��\u001a\u0004\b\u001b\u0010\u0013R\u0014\u0010\u0007\u001a\b\u0012\u0004\u0012\u00028\u00010\bX\u0082\u0004¢\u0006\u0002\n��¨\u0006#"}, d2 = {"Lru/chermenin/kio/cep/transforms/LateMatchValueDoFn;", "K", "Ljava/io/Serializable;", "V", "Lorg/apache/beam/sdk/transforms/DoFn;", "Lorg/apache/beam/sdk/values/KV;", "Lru/chermenin/kio/cep/ComplexEvent;", "pattern", "Lru/chermenin/kio/cep/pattern/Pattern;", "allowedLateness", "", "keyClass", "Ljava/lang/Class;", "valueClass", "(Lru/chermenin/kio/cep/pattern/Pattern;JLjava/lang/Class;Ljava/lang/Class;)V", "bufferState", "Lorg/apache/beam/sdk/state/StateSpec;", "Lorg/apache/beam/sdk/state/BagState;", "getBufferState", "()Lorg/apache/beam/sdk/state/StateSpec;", ConstantsKt.LATENESS_TIMER_KEY, "Lorg/apache/beam/sdk/state/TimerSpec;", "getLatenessTimer", "()Lorg/apache/beam/sdk/state/TimerSpec;", "nfaState", "Lorg/apache/beam/sdk/state/ValueState;", "Lru/chermenin/kio/cep/nfa/NFA;", "getNfaState", "onExpire", "", "context", "Lorg/apache/beam/sdk/transforms/DoFn$OnTimerContext;", "processElement", "Lorg/apache/beam/sdk/transforms/DoFn$ProcessContext;", "Lorg/apache/beam/sdk/state/Timer;", "kio-cep"})
/* loaded from: input_file:ru/chermenin/kio/cep/transforms/LateMatchValueDoFn.class */
public final class LateMatchValueDoFn<K extends Serializable, V extends Serializable> extends DoFn<KV<K, V>, KV<K, ComplexEvent<V>>> {

    @DoFn.StateId(ConstantsKt.NFA_STATE_KEY)
    @Nullable
    private final StateSpec<ValueState<NFA<?>>> nfaState;

    @DoFn.StateId(ConstantsKt.BUFFER_STATE_KEY)
    @NotNull
    private final StateSpec<BagState<KV<Long, KV<K, V>>>> bufferState;

    @DoFn.TimerId(ConstantsKt.LATENESS_TIMER_KEY)
    @NotNull
    private final TimerSpec latenessTimer;
    private final Pattern<V> pattern;
    private final long allowedLateness;

    @Nullable
    public final StateSpec<ValueState<NFA<?>>> getNfaState() {
        return this.nfaState;
    }

    @NotNull
    public final StateSpec<BagState<KV<Long, KV<K, V>>>> getBufferState() {
        return this.bufferState;
    }

    @NotNull
    public final TimerSpec getLatenessTimer() {
        return this.latenessTimer;
    }

    @DoFn.ProcessElement
    public final void processElement(@NotNull DoFn<KV<K, V>, KV<K, ComplexEvent<V>>>.ProcessContext processContext, @DoFn.StateId("nfa") @NotNull ValueState<NFA<?>> valueState, @DoFn.StateId("buffer") @NotNull BagState<KV<Long, KV<K, V>>> bagState, @DoFn.TimerId("latenessTimer") @NotNull Timer timer) {
        Intrinsics.checkParameterIsNotNull(processContext, "context");
        Intrinsics.checkParameterIsNotNull(valueState, "nfaState");
        Intrinsics.checkParameterIsNotNull(bagState, "bufferState");
        Intrinsics.checkParameterIsNotNull(timer, ConstantsKt.LATENESS_TIMER_KEY);
        NFA<V> nfa = (NFA) valueState.read();
        if (nfa == null) {
            nfa = this.pattern.compile$kio_cep();
        }
        NFA nfa2 = nfa;
        valueState.write(nfa2);
        Iterable read = bagState.read();
        Intrinsics.checkExpressionValueIsNotNull(read, "bufferState.read()");
        List list = CollectionsKt.toList(read);
        bagState.clear();
        KV kv = (KV) processContext.element();
        List<KV> list2 = list;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list2, 10));
        for (KV kv2 : list2) {
            Intrinsics.checkExpressionValueIsNotNull(kv2, "it");
            Object key = kv2.getKey();
            if (key == null) {
                Intrinsics.throwNpe();
            }
            arrayList.add((Long) key);
        }
        Long l = (Long) CollectionsKt.max(arrayList);
        if (l == null) {
            Instant timestamp = processContext.timestamp();
            Intrinsics.checkExpressionValueIsNotNull(timestamp, "context.timestamp()");
            l = Long.valueOf(timestamp.getMillis());
        }
        Intrinsics.checkExpressionValueIsNotNull(l, "bufferedEvents.map { it.…ontext.timestamp().millis");
        long longValue = l.longValue();
        long j = longValue - this.allowedLateness;
        List list3 = list;
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (Object obj : list3) {
            KV kv3 = (KV) obj;
            Intrinsics.checkExpressionValueIsNotNull(kv3, "it");
            Object key2 = kv3.getKey();
            if (key2 == null) {
                Intrinsics.throwNpe();
            }
            if (((Number) key2).longValue() <= j) {
                arrayList2.add(obj);
            } else {
                arrayList3.add(obj);
            }
        }
        Pair pair = new Pair(arrayList2, arrayList3);
        List list4 = (List) pair.component1();
        Iterator it = ((List) pair.component2()).iterator();
        while (it.hasNext()) {
            bagState.add((KV) it.next());
        }
        Instant timestamp2 = processContext.timestamp();
        Intrinsics.checkExpressionValueIsNotNull(timestamp2, "context.timestamp()");
        bagState.add(KV.of(Long.valueOf(timestamp2.getMillis()), kv));
        for (KV kv4 : CollectionsKt.sortedWith(list4, new Comparator<T>() { // from class: ru.chermenin.kio.cep.transforms.LateMatchValueDoFn$processElement$$inlined$sortedBy$1
            @Override // java.util.Comparator
            public final int compare(T t, T t2) {
                KV kv5 = (KV) t;
                Intrinsics.checkExpressionValueIsNotNull(kv5, "it");
                Long l2 = (Long) kv5.getKey();
                KV kv6 = (KV) t2;
                Intrinsics.checkExpressionValueIsNotNull(kv6, "it");
                return ComparisonsKt.compareValues(l2, (Long) kv6.getKey());
            }
        })) {
            if (nfa2 == null) {
                throw new TypeCastException("null cannot be cast to non-null type ru.chermenin.kio.cep.nfa.NFA<V>");
            }
            Intrinsics.checkExpressionValueIsNotNull(kv4, "it");
            Object value = kv4.getValue();
            Intrinsics.checkExpressionValueIsNotNull(value, "it.value");
            Object value2 = ((KV) value).getValue();
            Intrinsics.checkExpressionValueIsNotNull(value2, "it.value.value");
            Object key3 = kv4.getKey();
            if (key3 == null) {
                Intrinsics.throwNpe();
            }
            Intrinsics.checkExpressionValueIsNotNull(key3, "it.key!!");
            for (Map map : nfa2.process(value2, ((Number) key3).longValue())) {
                Intrinsics.checkExpressionValueIsNotNull(kv, "element");
                processContext.output(KV.of(kv.getKey(), new ComplexEvent(map)));
            }
        }
        timer.set(Instant.ofEpochMilli(longValue + this.allowedLateness));
    }

    @DoFn.OnTimer(ConstantsKt.LATENESS_TIMER_KEY)
    public final void onExpire(@NotNull DoFn<KV<K, V>, KV<K, ComplexEvent<V>>>.OnTimerContext onTimerContext, @DoFn.StateId("nfa") @NotNull ValueState<NFA<?>> valueState, @DoFn.StateId("buffer") @NotNull BagState<KV<Long, KV<K, V>>> bagState) {
        Intrinsics.checkParameterIsNotNull(onTimerContext, "context");
        Intrinsics.checkParameterIsNotNull(valueState, "nfaState");
        Intrinsics.checkParameterIsNotNull(bagState, "bufferState");
        Object read = valueState.read();
        if (read == null) {
            Intrinsics.throwNpe();
        }
        Intrinsics.checkExpressionValueIsNotNull(read, "nfaState.read()!!");
        NFA nfa = (NFA) read;
        Iterable read2 = bagState.read();
        Intrinsics.checkExpressionValueIsNotNull(read2, "bufferState.read()");
        List list = CollectionsKt.toList(read2);
        bagState.clear();
        if (!list.isEmpty()) {
            Object obj = list.get(0);
            Intrinsics.checkExpressionValueIsNotNull(obj, "bufferedEvents[0]");
            Object value = ((KV) obj).getValue();
            Intrinsics.checkExpressionValueIsNotNull(value, "bufferedEvents[0].value");
            Object key = ((KV) value).getKey();
            if (key == null) {
                Intrinsics.throwNpe();
            }
            Intrinsics.checkExpressionValueIsNotNull(key, "bufferedEvents[0].value.key!!");
            Serializable serializable = (Serializable) key;
            for (KV kv : CollectionsKt.sortedWith(list, new Comparator<T>() { // from class: ru.chermenin.kio.cep.transforms.LateMatchValueDoFn$onExpire$$inlined$sortedBy$1
                @Override // java.util.Comparator
                public final int compare(T t, T t2) {
                    KV kv2 = (KV) t;
                    Intrinsics.checkExpressionValueIsNotNull(kv2, "it");
                    Long l = (Long) kv2.getKey();
                    KV kv3 = (KV) t2;
                    Intrinsics.checkExpressionValueIsNotNull(kv3, "it");
                    return ComparisonsKt.compareValues(l, (Long) kv3.getKey());
                }
            })) {
                if (nfa == null) {
                    throw new TypeCastException("null cannot be cast to non-null type ru.chermenin.kio.cep.nfa.NFA<V>");
                }
                Intrinsics.checkExpressionValueIsNotNull(kv, "it");
                Object value2 = kv.getValue();
                Intrinsics.checkExpressionValueIsNotNull(value2, "it.value");
                Object value3 = ((KV) value2).getValue();
                Intrinsics.checkExpressionValueIsNotNull(value3, "it.value.value");
                Object key2 = kv.getKey();
                if (key2 == null) {
                    Intrinsics.throwNpe();
                }
                Intrinsics.checkExpressionValueIsNotNull(key2, "it.key!!");
                Iterator it = nfa.process(value3, ((Number) key2).longValue()).iterator();
                while (it.hasNext()) {
                    onTimerContext.output(KV.of(serializable, new ComplexEvent((Map) it.next())));
                }
            }
        }
    }

    public LateMatchValueDoFn(@NotNull Pattern<V> pattern, long j, @NotNull Class<K> cls, @NotNull Class<V> cls2) {
        Intrinsics.checkParameterIsNotNull(pattern, "pattern");
        Intrinsics.checkParameterIsNotNull(cls, "keyClass");
        Intrinsics.checkParameterIsNotNull(cls2, "valueClass");
        this.pattern = pattern;
        this.allowedLateness = j;
        this.nfaState = StateSpecs.value(SerializableCoder.of(NFA.class));
        StateSpec<BagState<KV<Long, KV<K, V>>>> bag = StateSpecs.bag(KvCoder.of(VarLongCoder.of(), KvCoder.of(SerializableCoder.of(cls), SerializableCoder.of(cls2))));
        Intrinsics.checkExpressionValueIsNotNull(bag, "StateSpecs.bag(\n        …ueClass))\n        )\n    )");
        this.bufferState = bag;
        TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
        Intrinsics.checkExpressionValueIsNotNull(timer, "TimerSpecs.timer(TimeDomain.EVENT_TIME)");
        this.latenessTimer = timer;
    }
}
