package ru.fix.completable.reactor.runtime;

import java.lang.reflect.Constructor;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.fix.aggregating.profiler.ProfiledCall;
import ru.fix.aggregating.profiler.Profiler;
import ru.fix.completable.reactor.graph.Graphable;
import ru.fix.completable.reactor.graph.runtime.RuntimeGraph;
import ru.fix.completable.reactor.runtime.debug.DebugSerializer;
import ru.fix.completable.reactor.runtime.debug.ToStringDebugSerializer;
import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder;
import ru.fix.completable.reactor.runtime.execution.ProcessingVertex;
import ru.fix.completable.reactor.runtime.execution.ReactorGraphExecution;
import ru.fix.completable.reactor.runtime.tracing.Tracer;

/* loaded from: input_file:ru/fix/completable/reactor/runtime/CompletableReactor.class */
public class CompletableReactor implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(CompletableReactor.class);
    private final Profiler profiler;
    private final ExecutionBuilder glExecutionBuilder;
    private final DebugSerializer debugSerializer = new ToStringDebugSerializer();
    private boolean debugProcessingVertexGraphState = false;
    private final Map<Class<?>, Function> inlinePayloadGraphs = new ConcurrentHashMap();
    private final ScheduledExecutorService timeoutExecutorService = newScheduledThreadPool(1, "completable-reactor-check-timeout-");
    private volatile long executionTimeoutMs = TimeUnit.MINUTES.toMillis(15);
    private AtomicBoolean isClosed = new AtomicBoolean();
    final Map<Class<?>, PayloadStatCounters> payloadStatCounters = new ConcurrentHashMap();
    private final AtomicLong maxPendingRequestCount = new AtomicLong(Long.MAX_VALUE);
    private final AtomicLong pendingRequestCount = new AtomicLong();
    private final AtomicLong closeTimeoutMs = new AtomicLong(120000);
    private final ConcurrentHashMap<Class<?>, RuntimeGraph> glPayloadGraphs = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Class<? extends Graphable>, Boolean> glGraphConfigs = new ConcurrentHashMap<>();
    private final GraphBuilder graphBuilder = new GraphBuilder();
    private final ReactorTracer reactorTracer = new ReactorTracer();

    /* loaded from: input_file:ru/fix/completable/reactor/runtime/CompletableReactor$Execution.class */
    public static class Execution<PayloadType> {
        final CompletableFuture<Void> chainExecutionFuture;
        final CompletableFuture<PayloadType> resultFuture;
        private final Collection debugProcessingVertexGraphState;

        public Execution(CompletableFuture<PayloadType> completableFuture, CompletableFuture<Void> completableFuture2, Collection collection) {
            this.chainExecutionFuture = completableFuture2;
            this.resultFuture = completableFuture;
            this.debugProcessingVertexGraphState = collection;
        }

        public CompletableFuture<Void> getChainExecutionFuture() {
            return this.chainExecutionFuture;
        }

        public CompletableFuture<PayloadType> getResultFuture() {
            return this.resultFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:ru/fix/completable/reactor/runtime/CompletableReactor$PayloadStatCounters.class */
    public static class PayloadStatCounters {
        final LongAdder runningTotal = new LongAdder();
        final LongAdder runningWithoutResult = new LongAdder();

        PayloadStatCounters() {
        }
    }

    /* loaded from: input_file:ru/fix/completable/reactor/runtime/CompletableReactor$PayloadStatisticsReport.class */
    public static class PayloadStatisticsReport {
        final long runningTotal;
        final long runningWithoutResult;

        public PayloadStatisticsReport(long j, long j2) {
            this.runningTotal = j;
            this.runningWithoutResult = j2;
        }
    }

    /* loaded from: input_file:ru/fix/completable/reactor/runtime/CompletableReactor$ReactorTracer.class */
    private static class ReactorTracer implements Tracer {
        private static final Logger log = LoggerFactory.getLogger(ReactorTracer.class);
        private volatile Tracer tracer;

        private ReactorTracer() {
        }

        @Override // ru.fix.completable.reactor.runtime.tracing.Tracer
        public boolean isTraceable(Object obj) {
            try {
                if (this.tracer != null) {
                    if (this.tracer.isTraceable(obj)) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                log.error("Failed to call isTraceable method of tracer.", e);
                return false;
            }
        }

        @Override // ru.fix.completable.reactor.runtime.tracing.Tracer
        public Object beforeHandle(String str, Object obj) {
            try {
                return this.tracer.beforeHandle(str, obj);
            } catch (Exception e) {
                log.error("Failed to call beforeHandle method of tracer.", e);
                return null;
            }
        }

        @Override // ru.fix.completable.reactor.runtime.tracing.Tracer
        public void afterHandle(Object obj, String str, Object obj2, Throwable th) {
            try {
                this.tracer.afterHandle(obj, str, obj2, th);
            } catch (Exception e) {
                log.error("Failed to call afterHandle method of tracer.", e);
            }
        }

        @Override // ru.fix.completable.reactor.runtime.tracing.Tracer
        public Object beforeMerge(String str, Object obj, Object obj2) {
            try {
                return this.tracer.beforeMerge(str, obj, obj2);
            } catch (Exception e) {
                log.error("Failed to call beforeMerge method of tracer.", e);
                return null;
            }
        }

        @Override // ru.fix.completable.reactor.runtime.tracing.Tracer
        public void afterMerger(Object obj, String str, Object obj2) {
            try {
                this.tracer.afterMerger(obj, str, obj2);
            } catch (Exception e) {
                log.error("Failed to call afterMerger method of tracer.", e);
            }
        }
    }

    /* loaded from: input_file:ru/fix/completable/reactor/runtime/CompletableReactor$StatisticsReport.class */
    public static class StatisticsReport {
        final Map<Class<?>, PayloadStatisticsReport> payloadStatisticsReports = new HashMap();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <GraphConfigType extends Graphable> boolean registerGraphIfAbsent(GraphConfigType graphconfigtype) {
        Objects.requireNonNull(graphconfigtype, "graphConfig");
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.glGraphConfigs.computeIfAbsent(graphconfigtype.getClass(), cls -> {
            Class<?> payloadTypeForGraphConfigBasedClass = getPayloadTypeForGraphConfigBasedClass(graphconfigtype.getClass());
            if (this.glPayloadGraphs.putIfAbsent(payloadTypeForGraphConfigBasedClass, this.graphBuilder.buildGraph(graphconfigtype)) != null) {
                throw new IllegalArgumentException("New graph config: " + graphconfigtype + " of type " + graphconfigtype.getClass() + " is registering graph for payload " + payloadTypeForGraphConfigBasedClass + ". But this payload was already registered by another graph config.");
            }
            atomicBoolean.set(true);
            return true;
        });
        return atomicBoolean.get();
    }

    public <GraphConfigType extends Graphable> void registerGraph(GraphConfigType graphconfigtype) {
        if (!registerGraphIfAbsent((CompletableReactor) graphconfigtype)) {
            throw new IllegalStateException("Graph '" + graphconfigtype.getClass().getName() + "' already registered.");
        }
    }

    public <GraphConfigType extends Graphable> void registerGraph(Class<GraphConfigType> cls) {
        if (!registerGraphIfAbsent(cls)) {
            throw new IllegalStateException("Graph '" + cls.getName() + "' already registered.");
        }
    }

    public <GraphConfigType extends Graphable> boolean registerGraphIfAbsent(Class<GraphConfigType> cls) {
        Objects.requireNonNull(cls, "graphConfigClass");
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.glGraphConfigs.computeIfAbsent(cls, cls2 -> {
            try {
                Class<?> payloadTypeForGraphConfigBasedClass = getPayloadTypeForGraphConfigBasedClass(cls);
                try {
                    Constructor declaredConstructor = cls.getDeclaredConstructor(new Class[0]);
                    if (!declaredConstructor.isAccessible()) {
                        declaredConstructor.setAccessible(true);
                    }
                    this.glPayloadGraphs.putIfAbsent(payloadTypeForGraphConfigBasedClass, this.graphBuilder.buildGraph((Graphable) declaredConstructor.newInstance(new Object[0])));
                    atomicBoolean.set(true);
                    return true;
                } catch (Exception e) {
                    throw new IllegalArgumentException("Failed to instantiate graph config instance of class: " + cls + ". Graph config class should have default no arg constructor. If it is inner class then it sould be static.", e);
                }
            } catch (Exception e2) {
                throw new RuntimeException("Failed to register graph config in ReactorGraph: " + cls, e2);
            }
        });
        return atomicBoolean.get();
    }

    private Class getPayloadTypeForGraphConfigBasedClass(Class cls) {
        Class cls2;
        if (cls.getSuperclass() == null) {
            throw new IllegalArgumentException("Superclass of graph config class " + cls + " is NULL. Graph config class should extend GraphConfig<PayloadType>.");
        }
        if (!Graphable.class.isAssignableFrom(cls)) {
            throw new IllegalArgumentException("Superclass of graph config class " + cls + " is not GraphConfig<PayloadType>. Superclass of graph config class should be GraphConfig<PayloadType>. Actual superclass of graph config is: " + cls.getSuperclass());
        }
        Class cls3 = cls;
        while (true) {
            cls2 = cls3;
            if (cls2 == null || cls2.getSuperclass() == null || Arrays.asList(cls2.getSuperclass().getInterfaces()).contains(Graphable.class)) {
                break;
            }
            cls3 = cls2.getSuperclass();
        }
        if (cls2.getGenericSuperclass() == null) {
            throw new IllegalArgumentException("Generic Superclass of graph config class " + cls + " is NULL. Superclass of graph config class should be generic GraphConfig<PayloadType>.");
        }
        if (!(cls2.getGenericSuperclass() instanceof ParameterizedType)) {
            throw new IllegalArgumentException("Superclass of graph config class " + cls + " is not ParameterizedType Superclass of graph config class should be GraphConfig<PayloadType>. Actual generic superclass of graph config is: " + cls.getGenericSuperclass());
        }
        Type[] actualTypeArguments = ((ParameterizedType) cls2.getGenericSuperclass()).getActualTypeArguments();
        if (actualTypeArguments.length != 1) {
            throw new IllegalArgumentException("Superclass of graph config class " + cls + " should have 1 generic type parameter. Superclass of graph config class should be GraphConfig<PayloadType>. Actual super class is : " + cls.getGenericSuperclass() + " Actual parameter count of superclass is : " + actualTypeArguments.length);
        }
        if (actualTypeArguments[0] == null) {
            throw new IllegalArgumentException("Superclass of graph config class " + cls + " should have 1 non null generic type parameter. Superclass of graph config class should be GraphConfig<PayloadType>. Actual super class is : " + cls.getGenericSuperclass() + " Actual generic parameter of superclass is null.");
        }
        if (actualTypeArguments[0] instanceof Class) {
            return (Class) actualTypeArguments[0];
        }
        throw new IllegalArgumentException("Superclass of graph config " + cls + " should have generic type parameter of type Class. Superclass of graph config class should be GraphConfig<PayloadType>. Actual super class is : " + cls.getGenericSuperclass() + " Actual generic parameter type is : " + actualTypeArguments[0].getClass());
    }

    public CompletableReactor(Profiler profiler) {
        this.profiler = profiler;
        AtomicLong atomicLong = this.pendingRequestCount;
        Objects.requireNonNull(atomicLong);
        profiler.attachIndicator(Metrics.PENDING_REQUEST, atomicLong::get);
        this.glExecutionBuilder = new ExecutionBuilder(this.profiler, obj -> {
            try {
                return internalSubmit(obj, this.executionTimeoutMs).getResultFuture();
            } catch (Exception e) {
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.completeExceptionally(e);
                return completableFuture;
            }
        }, this.debugSerializer, this.reactorTracer);
    }

    public CompletableReactor setDebugProcessingVertexGraphState(boolean z) {
        this.debugProcessingVertexGraphState = true;
        return this;
    }

    public long getMaxPendingRequestCount() {
        return this.maxPendingRequestCount.get();
    }

    public long getPendingRequestCount() {
        return this.pendingRequestCount.get();
    }

    public CompletableReactor setMaxPendingRequestCount(long j) {
        this.maxPendingRequestCount.set(j);
        return this;
    }

    public long getCloseTimeoutMs() {
        return this.closeTimeoutMs.get();
    }

    public void setCloseTimeoutMs(long j) {
        this.closeTimeoutMs.set(j);
    }

    public long getExecutionTimeoutMs() {
        return this.executionTimeoutMs;
    }

    public void setExecutionTimeoutMs(long j) {
        this.executionTimeoutMs = j;
    }

    public void setTracer(Tracer tracer) {
        this.reactorTracer.tracer = tracer;
    }

    public Tracer getTracer() {
        return this.reactorTracer.tracer;
    }

    public void removeTracer() {
        this.reactorTracer.tracer = null;
    }

    public <PayloadType> void registerGraph(Class<PayloadType> cls, Function<PayloadType, CompletableFuture<PayloadType>> function) {
        this.inlinePayloadGraphs.put(cls, function);
    }

    public <PayloadType> void registerGraphSync(Class<PayloadType> cls, Function<PayloadType, PayloadType> function) {
        registerGraph(cls, obj -> {
            return CompletableFuture.completedFuture(function.apply(obj));
        });
    }

    private static ScheduledThreadPoolExecutor newScheduledThreadPool(int i, String str) {
        AtomicInteger atomicInteger = new AtomicInteger();
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(i, runnable -> {
            return new Thread(runnable, str + atomicInteger.getAndIncrement());
        });
        scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
        return scheduledThreadPoolExecutor;
    }

    public StatisticsReport buildStatisticsReport() {
        StatisticsReport statisticsReport = new StatisticsReport();
        this.payloadStatCounters.forEach((cls, payloadStatCounters) -> {
            statisticsReport.payloadStatisticsReports.put(cls, new PayloadStatisticsReport(payloadStatCounters.runningTotal.sum(), payloadStatCounters.runningWithoutResult.sum()));
        });
        return statisticsReport;
    }

    public <PayloadType> Optional<Execution<PayloadType>> trySubmit(PayloadType payloadtype) {
        return trySubmit(payloadtype, this.executionTimeoutMs);
    }

    public <PayloadType> Optional<Execution<PayloadType>> trySubmit(PayloadType payloadtype, long j) {
        return this.pendingRequestCount.get() > this.maxPendingRequestCount.get() ? Optional.empty() : Optional.of(submit(payloadtype, j));
    }

    public <PayloadType> Execution<PayloadType> submit(PayloadType payloadtype) {
        return submit(payloadtype, this.executionTimeoutMs);
    }

    public <PayloadType> Execution<PayloadType> submit(PayloadType payloadtype, long j) {
        if (this.isClosed.get()) {
            throw new IllegalStateException("CompletableReactor is closed. Payload " + payloadtype + " is discarded.");
        }
        return internalSubmit(payloadtype, j);
    }

    private <PayloadType> Execution<PayloadType> internalSubmit(PayloadType payloadtype, long j) {
        if (this.pendingRequestCount.get() > this.maxPendingRequestCount.get()) {
            log.error("Max pending request count is reached. Request will be accepted but there is a possibility of OOM or something wrong with back pressure logic in client code.\nUse trySubmit method that supports back pressure or correctly handle the load on CompletableReactor on client side.");
        }
        ProfiledCall start = this.profiler.profiledCall(ProfilerIdentity.submitIdentity(payloadtype.getClass().getName())).start();
        ProfiledCall start2 = this.profiler.profiledCall(ProfilerIdentity.executionIdentity(payloadtype.getClass().getName())).start();
        Function function = this.inlinePayloadGraphs.get(payloadtype.getClass());
        if (function != null) {
            CompletableFuture completableFuture = (CompletableFuture) function.apply(payloadtype);
            completableFuture.whenCompleteAsync((obj, th) -> {
                start.stop();
                start2.stop();
            });
            return new Execution<>(completableFuture, completableFuture.thenAccept(obj2 -> {
            }), null);
        }
        RuntimeGraph runtimeGraph = this.glPayloadGraphs.get(payloadtype.getClass());
        if (runtimeGraph == null) {
            throw new IllegalArgumentException("Rector graph not found for payload " + payloadtype.getClass());
        }
        ReactorGraphExecution<PayloadType> build = this.glExecutionBuilder.build(runtimeGraph);
        this.pendingRequestCount.incrementAndGet();
        PayloadStatCounters computeIfAbsent = this.payloadStatCounters.computeIfAbsent(payloadtype.getClass(), cls -> {
            return new PayloadStatCounters();
        });
        computeIfAbsent.runningTotal.increment();
        build.getChainExecutionFuture().handleAsync((r3, th2) -> {
            computeIfAbsent.runningTotal.decrement();
            return null;
        });
        build.getResultFuture().handleAsync((obj3, th3) -> {
            computeIfAbsent.runningWithoutResult.decrement();
            return null;
        });
        build.getSubmitFuture().complete(payloadtype);
        ScheduledFuture<?> schedule = this.timeoutExecutorService.schedule(() -> {
            if (!build.getResultFuture().isDone()) {
                CompletableFuture resultFuture = build.getResultFuture();
                dumpExecutionState(build);
                resultFuture.completeExceptionally(new TimeoutException("Response for payload " + payloadtype + " took more than " + j + " ms.\n" + resultFuture));
            }
            if (build.getChainExecutionFuture().isDone()) {
                return;
            }
            CompletableFuture<Void> chainExecutionFuture = build.getChainExecutionFuture();
            dumpExecutionState(build);
            chainExecutionFuture.completeExceptionally(new TimeoutException("Execution of payload " + payloadtype + " took more than " + j + " ms.\n" + chainExecutionFuture));
        }, j, TimeUnit.MILLISECONDS);
        build.getChainExecutionFuture().handleAsync((r7, th4) -> {
            if (this.pendingRequestCount.decrementAndGet() == 0) {
                synchronized (this.pendingRequestCount) {
                    this.pendingRequestCount.notifyAll();
                }
            }
            schedule.cancel(false);
            return null;
        });
        build.getResultFuture().whenCompleteAsync((obj4, th5) -> {
            start.stop();
        });
        build.getChainExecutionFuture().whenCompleteAsync((r32, th6) -> {
            start2.stop();
        });
        return new Execution<>(build.getResultFuture(), build.getChainExecutionFuture(), this.debugProcessingVertexGraphState ? build.getDebugProcessingVertexGraphState() : null);
    }

    private <PayloadType> String dumpExecutionState(ReactorGraphExecution<PayloadType> reactorGraphExecution) {
        Collection debugProcessingVertexGraphState = reactorGraphExecution.getDebugProcessingVertexGraphState();
        return debugProcessingVertexGraphState.isEmpty() ? "(no vertices)" : debugProcessingVertexGraphState.iterator().next() instanceof ProcessingVertex ? this.glExecutionBuilder.dumpExecutionState(reactorGraphExecution) : "(invalid type of debug state)";
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.isClosed.set(true);
        if (this.pendingRequestCount.get() > 0) {
            log.info("Closing Completable Reactor. Waiting for pending requests: {}. Max waiting time {} ms", Long.valueOf(this.pendingRequestCount.get()), this.closeTimeoutMs);
            long currentTimeMillis = System.currentTimeMillis() + this.closeTimeoutMs.get();
            synchronized (this.pendingRequestCount) {
                while (this.pendingRequestCount.get() > 0) {
                    long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                    if (currentTimeMillis2 <= 0) {
                        break;
                    } else {
                        this.pendingRequestCount.wait(currentTimeMillis2);
                    }
                }
            }
            if (this.pendingRequestCount.get() > 0) {
                log.error("Completable Reactor forced to be closed due to timeout. There are {} pending request left.", Long.valueOf(this.pendingRequestCount.get()));
            } else {
                log.info("Completable Reactor closed without any pending request left to process.");
            }
        }
    }
}
