package io.mats3.util;

import io.mats3.MatsEndpoint;
import io.mats3.MatsFactory;
import io.mats3.MatsInitiator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:io/mats3/util/MatsFuturizer.class */
public class MatsFuturizer implements AutoCloseable {
    private static final String LOG_PREFIX = "#MATS-UTIL# ";
    public static final String MDC_TRACE_ID = "traceId";
    public static final String MDC_MATS_INIT_ID = "mats.init.Id";
    public static final String MDC_MATS_FUTURE_COMPLETED = "mats.FutureCompleted";
    public static final String MDC_MATS_FUTURE_TIME_RTT = "mats.future.rtt.ms";
    public static final String MDC_MATS_FUTURE_TIME_COMPLETING = "mats.future.completing.ms";
    public static final String MDC_MATS_FUTURE_TIMEOUT = "mats.FutureTimeout";
    protected final MatsFactory _matsFactory;
    protected final MatsInitiator _matsInitiator;
    protected final String _terminatorEndpointId;
    protected final ThreadPoolExecutor _futureCompleterThreadPool;
    protected final int _maxOutstandingPromises;
    protected final MatsEndpoint<Void, String> _replyHandlerEndpoint;
    protected volatile boolean _replyHandlerEndpointStarted;
    protected Promise<?> _nextInLineToTimeout;
    private static final Logger log = LoggerFactory.getLogger(MatsFuturizer.class);
    private static final Logger log_reply = LoggerFactory.getLogger(MatsFuturizer.class.getName() + ".Reply");
    protected final AtomicInteger _threadNumber = new AtomicInteger();
    protected final ReentrantLock _internalStateLock = new ReentrantLock();
    protected final Condition _timeouterPing_InternalStateLock = this._internalStateLock.newCondition();
    protected final HashMap<String, Promise<?>> _correlationIdToPromiseMap = new HashMap<>();
    protected final PriorityQueue<Promise<?>> _timeoutSortedPromises = new PriorityQueue<>();
    protected volatile boolean _runFlag = true;

    /* loaded from: input_file:io/mats3/util/MatsFuturizer$MatsFuturizerTimeoutException.class */
    public static class MatsFuturizerTimeoutException extends RuntimeException {
        private final long initiationTimestamp;
        private final String traceId;

        public MatsFuturizerTimeoutException(String str, long j, String str2) {
            super(str);
            this.initiationTimestamp = j;
            this.traceId = str2;
        }

        public long getInitiationTimestamp() {
            return this.initiationTimestamp;
        }

        public String getTraceId() {
            return this.traceId;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/mats3/util/MatsFuturizer$Promise.class */
    public static class Promise<T> implements Comparable<Promise<?>> {
        public final String _traceId;
        public final String _correlationId;
        public final String _from;
        public final String _to;
        public final long _initiationTimestamp;
        public final long _initiationNanos;
        public final long _timeoutTimestamp;
        public final Class<T> _replyClass;
        public final CompletableFuture<Reply<T>> _future;

        public Promise(String str, String str2, String str3, String str4, long j, long j2, long j3, Class<T> cls, CompletableFuture<Reply<T>> completableFuture) {
            this._traceId = str;
            this._correlationId = str2;
            this._from = str3;
            this._to = str4;
            this._initiationTimestamp = j;
            this._initiationNanos = j2;
            this._timeoutTimestamp = j3;
            this._replyClass = cls;
            this._future = completableFuture;
        }

        @Override // java.lang.Comparable
        public int compareTo(Promise<?> promise) {
            return this._timeoutTimestamp == promise._timeoutTimestamp ? this._correlationId.compareTo(promise._correlationId) : this._timeoutTimestamp - promise._timeoutTimestamp > 0 ? 1 : -1;
        }
    }

    /* loaded from: input_file:io/mats3/util/MatsFuturizer$Reply.class */
    public static class Reply<T> {
        private static final Logger log = LoggerFactory.getLogger(Reply.class);
        public final MatsEndpoint.DetachedProcessContext context;
        public final T reply;
        public final long initiationTimestamp;
        public final long initiationNanos;
        public final long roundTripNanos;

        public Reply(MatsEndpoint.DetachedProcessContext detachedProcessContext, T t, long j) {
            log.warn("#MATS-UTIL# HARD WARNING - DEPRECATION!! Using the new Reply(context, reply, initiationTimestamp) constructor is deprecated, use Reply.forTest(context, reply) instead!");
            this.context = detachedProcessContext;
            this.reply = t;
            this.initiationTimestamp = j;
            this.initiationNanos = 0L;
            this.roundTripNanos = 0L;
        }

        public static <T> Reply<T> forTest(T t) {
            return new Reply<>(null, t, 0L, 0L);
        }

        public static <T> Reply<T> forTest(MatsEndpoint.DetachedProcessContext detachedProcessContext, T t) {
            return new Reply<>(detachedProcessContext, t, 0L, 0L);
        }

        private Reply(MatsEndpoint.DetachedProcessContext detachedProcessContext, T t, long j, long j2) {
            this.context = detachedProcessContext;
            this.reply = t;
            this.initiationTimestamp = j;
            this.initiationNanos = j2;
            this.roundTripNanos = System.nanoTime() - j2;
        }

        public MatsEndpoint.DetachedProcessContext getContext() {
            return this.context;
        }

        public T getReply() {
            log.warn("#MATS-UTIL# HARD WARNING - DEPRECATION!! Using Reply.getReply() is deprecated, use Reply.get()!");
            return get();
        }

        public T get() {
            return this.reply;
        }

        public long getInitiationTimestamp() {
            return this.initiationTimestamp;
        }

        public long getInitiationNanos() {
            return this.initiationNanos;
        }

        public long getRoundTripNanos() {
            return this.roundTripNanos;
        }
    }

    public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory) {
        String appName = matsFactory.getFactoryConfig().getAppName();
        if (appName == null || appName.trim().isEmpty()) {
            throw new IllegalArgumentException("The matsFactory.getFactoryConfig().getAppName() returns [" + appName + "], which is not allowed to use as endpointIdPrefix (null or blank).");
        }
        return createMatsFuturizer(matsFactory, appName);
    }

    public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory, String str) {
        return createMatsFuturizer(matsFactory, str, Math.max(5, matsFactory.getFactoryConfig().getConcurrency() * 4), Math.max(100, matsFactory.getFactoryConfig().getConcurrency() * 20), 50000);
    }

    public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory, String str, int i, int i2, int i3) {
        return new MatsFuturizer(matsFactory, str, i, i2, i3);
    }

    protected MatsFuturizer(MatsFactory matsFactory, String str, int i, int i2, int i3) {
        this._matsFactory = matsFactory;
        String sanitizeName = SanitizeMqNames.sanitizeName(str);
        if (sanitizeName == null || sanitizeName.trim().isEmpty()) {
            throw new IllegalArgumentException("The sanitized endpointIdPrefix (orig:[" + str + "]) is not allowed to use as endpointIdPrefix (null or blank).");
        }
        this._matsInitiator = matsFactory.getOrCreateInitiator(sanitizeName + ".Futurizer.init");
        this._terminatorEndpointId = sanitizeName + ".Futurizer.private.repliesFor." + this._matsFactory.getFactoryConfig().getNodename();
        this._futureCompleterThreadPool = _newThreadPool(i, i2);
        this._maxOutstandingPromises = i3;
        this._replyHandlerEndpoint = this._matsFactory.subscriptionTerminator(this._terminatorEndpointId, String.class, MatsEndpoint.MatsObject.class, this::_handleRepliesForPromises);
        _startTimeouterThread();
        log.info("#MATS-UTIL# MatsFuturizer created. EndpointIdPrefix:[" + sanitizeName + "], corePoolSize:[" + i + "], maxPoolSize:[" + i2 + "], maxOutstandingPromises:[" + i3 + "]");
    }

    public <T> CompletableFuture<Reply<T>> futurize(CharSequence charSequence, String str, String str2, int i, TimeUnit timeUnit, Class<T> cls, Object obj, MatsInitiator.InitiateLambda initiateLambda) {
        Promise<T> _createPromise = _createPromise(charSequence.toString(), str, str2, cls, i, timeUnit);
        _assertFuturizerRunning();
        _enqueuePromise(_createPromise);
        _sendRequestToFulfillPromise(str, str2, charSequence.toString(), obj, initiateLambda, _createPromise);
        return _createPromise._future;
    }

    public <T> CompletableFuture<Reply<T>> futurize(CharSequence charSequence, String str, String str2, Class<T> cls, Object obj, MatsInitiator.InitiateLambda initiateLambda) {
        return futurize(charSequence, str, str2, 150, TimeUnit.SECONDS, cls, obj, initiateLambda);
    }

    public <T> CompletableFuture<Reply<T>> futurizeNonessential(CharSequence charSequence, String str, String str2, Class<T> cls, Object obj) {
        return futurize(charSequence, str, str2, 150, TimeUnit.SECONDS, cls, obj, matsInitiate -> {
            matsInitiate.nonPersistent(180000L).interactive().noAudit();
        });
    }

    public int getOutstandingPromiseCount() {
        this._internalStateLock.lock();
        try {
            return this._correlationIdToPromiseMap.size();
        } finally {
            this._internalStateLock.unlock();
        }
    }

    public ThreadPoolExecutor getCompleterThreadPool() {
        return this._futureCompleterThreadPool;
    }

    protected ThreadPoolExecutor _newThreadPool(int i, int i2) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(i, i2, 5L, TimeUnit.MINUTES, new LinkedTransferQueue<Runnable>() { // from class: io.mats3.util.MatsFuturizer.1
            @Override // java.util.concurrent.LinkedTransferQueue, java.util.Queue, java.util.concurrent.BlockingQueue
            public boolean offer(Runnable runnable) {
                return tryTransfer(runnable);
            }
        }, runnable -> {
            return new Thread(runnable, "MatsFuturizer completer #" + this._threadNumber.getAndIncrement());
        });
        threadPoolExecutor.setRejectedExecutionHandler((runnable2, threadPoolExecutor2) -> {
            ((LinkedTransferQueue) threadPoolExecutor2.getQueue()).put(runnable2);
        });
        return threadPoolExecutor;
    }

    protected <T> Promise<T> _createPromise(String str, String str2, String str3, Class<T> cls, int i, TimeUnit timeUnit) {
        long millis = timeUnit.toMillis(i);
        if (millis <= 0) {
            throw new IllegalArgumentException("Timeout in milliseconds cannot be zero or negative [" + millis + "].");
        }
        String randomCorrelationId = RandomString.randomCorrelationId();
        long currentTimeMillis = System.currentTimeMillis();
        CompletableFuture completableFuture = new CompletableFuture();
        if (log.isDebugEnabled()) {
            log.debug("#MATS-UTIL# Creating Promise for TraceId [" + str + "], from [" + str2 + "], to [" + str3 + "], timeout in [" + millis + "] millis.");
        }
        return new Promise<>(str, randomCorrelationId, str2, str3, currentTimeMillis, System.nanoTime(), currentTimeMillis + millis, cls, completableFuture);
    }

    protected <T> void _enqueuePromise(Promise<T> promise) {
        this._internalStateLock.lock();
        try {
            if (this._correlationIdToPromiseMap.size() >= this._maxOutstandingPromises) {
                throw new IllegalStateException("There are too many Promises outstanding, so cannot add more - limit is [" + this._maxOutstandingPromises + "].");
            }
            this._correlationIdToPromiseMap.put(promise._correlationId, promise);
            this._timeoutSortedPromises.add(promise);
            if (this._nextInLineToTimeout != this._timeoutSortedPromises.peek()) {
                this._timeouterPing_InternalStateLock.signal();
            }
        } finally {
            this._internalStateLock.unlock();
        }
    }

    protected void _assertFuturizerRunning() {
        if (!this._replyHandlerEndpointStarted) {
            if (!this._replyHandlerEndpoint.waitForReceiving(60000)) {
                throw new IllegalStateException("The Reply Handler SubscriptionTerminator Endpoint would not start.");
            }
            this._replyHandlerEndpointStarted = true;
        }
        if (!this._runFlag) {
            throw new IllegalStateException("This MatsFuturizer [" + this._terminatorEndpointId + "] is shut down.");
        }
    }

    protected <T> void _sendRequestToFulfillPromise(String str, String str2, String str3, Object obj, MatsInitiator.InitiateLambda initiateLambda, Promise<T> promise) {
        this._matsInitiator.initiateUnchecked(matsInitiate -> {
            matsInitiate.traceId(str3).from(str).to(str2).replyToSubscription(this._terminatorEndpointId, promise._correlationId);
            initiateLambda.initiate(matsInitiate);
            matsInitiate.request(obj);
        });
    }

    protected void _handleRepliesForPromises(MatsEndpoint.ProcessContext<Void> processContext, String str, MatsEndpoint.MatsObject matsObject) {
        this._internalStateLock.lock();
        try {
            Promise<?> remove = this._correlationIdToPromiseMap.remove(str);
            if (remove != null) {
                this._timeoutSortedPromises.remove(remove);
            }
            if (remove != null) {
                this._futureCompleterThreadPool.execute(() -> {
                    try {
                        MDC.put(MDC_TRACE_ID, remove._traceId);
                        MDC.put(MDC_MATS_INIT_ID, remove._from);
                        if (log.isDebugEnabled()) {
                            log.debug("#MATS-UTIL# Completing promise from [" + remove._from + "]: [" + remove + "]");
                        }
                        try {
                            _completeFuture(processContext, _deserializeReply(matsObject, remove._replyClass), remove);
                        } catch (Throwable th) {
                            log.error("Got problems completing Future due to failing to deserialize the incoming object to expected class [" + remove._replyClass.getName() + "], thus doing future.completeExceptionally(..) with the [" + th.getClass().getSimpleName() + "]. Initiated from [" + remove._from + "], with reply from [" + processContext.getFromStageId() + "], traceId [" + processContext.getTraceId() + "]", th);
                            remove._future.completeExceptionally(th);
                        }
                    } catch (Throwable th2) {
                        log.error("#MATS-UTIL# Got problems completing Future initiated from [" + remove._from + "], with reply from [" + processContext.getFromStageId() + "], traceId:[" + processContext.getTraceId() + "]", th2);
                    } finally {
                        MDC.clear();
                    }
                });
                return;
            }
            MDC.put(MDC_TRACE_ID, processContext.getTraceId());
            log.info("#MATS-UTIL# Promise gone! Got reply from [" + processContext.getFromStageId() + "] for Future with traceId:[" + processContext.getTraceId() + "], but the Promise had timed out.");
            MDC.remove(MDC_TRACE_ID);
        } finally {
            this._internalStateLock.unlock();
        }
    }

    protected Object _deserializeReply(MatsEndpoint.MatsObject matsObject, Class<?> cls) {
        return matsObject.toClass(cls);
    }

    protected void _completeFuture(MatsEndpoint.ProcessContext<Void> processContext, Object obj, Promise<?> promise) {
        Reply<?> reply = new Reply<>(processContext, obj, promise._initiationTimestamp, promise._initiationNanos);
        if (!log_reply.isInfoEnabled()) {
            promise._future.complete(reply);
            return;
        }
        long nanoTime = System.nanoTime();
        promise._future.complete(reply);
        long nanoTime2 = System.nanoTime();
        long j = nanoTime2 - nanoTime;
        long j2 = nanoTime2 - promise._initiationNanos;
        double round = Math.round(reply.roundTripNanos / 1000.0d) / 1000.0d;
        double round2 = Math.round(j / 1000.0d) / 1000.0d;
        double round3 = Math.round(j2 / 1000.0d) / 1000.0d;
        MDC.put(MDC_MATS_FUTURE_COMPLETED, Double.toString(round3));
        MDC.put(MDC_MATS_FUTURE_TIME_RTT, Double.toString(round));
        MDC.put(MDC_MATS_FUTURE_TIME_COMPLETING, Double.toString(round2));
        Logger logger = log_reply;
        logger.info("#MATS-UTIL# Completed Future from initiatorId [" + promise._from + "] with answer from [" + processContext.getFromStageId() + (obj != null ? "], with instance of [" + obj.getClass().getSimpleName() + "]" : "], which was null") + " - Total:[" + round3 + " ms], Mats RTT:[" + logger + " ms].");
    }

    protected void _startTimeouterThread() {
        new Thread(() -> {
            long currentTimeMillis;
            Promise<?> peek;
            long j;
            log.info("#MATS-UTIL# MatsFuturizer Timeouter-thread: Started!");
            while (this._runFlag) {
                ArrayList<Promise> arrayList = new ArrayList();
                this._internalStateLock.lock();
                while (this._runFlag) {
                    try {
                        try {
                            currentTimeMillis = System.currentTimeMillis();
                            peek = this._timeoutSortedPromises.peek();
                        } catch (Throwable th) {
                            log.error("#MATS-UTIL# Got an unexpected Throwable in the promise-timeouter-thread. Loop and check whether to exit.", th);
                            if (!this._runFlag) {
                                break;
                            }
                            try {
                                Thread.sleep(10000L);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        if (peek == null) {
                            this._nextInLineToTimeout = null;
                            j = 30000;
                        } else if (currentTimeMillis >= peek._timeoutTimestamp) {
                            if (log.isDebugEnabled()) {
                                Logger logger = log;
                                long j2 = currentTimeMillis - peek._timeoutTimestamp;
                                String str = peek._traceId;
                                logger.debug("#MATS-UTIL# Promise at head of timeout queue HAS timed out [" + j2 + "] millis ago - traceId [" + logger + "].");
                            }
                            this._timeoutSortedPromises.remove();
                            this._correlationIdToPromiseMap.remove(peek._correlationId);
                            arrayList.add(peek);
                        } else {
                            this._nextInLineToTimeout = peek;
                            j = peek._timeoutTimestamp - currentTimeMillis;
                            if (log.isDebugEnabled()) {
                                Logger logger2 = log;
                                String str2 = peek._traceId;
                                logger2.debug("#MATS-UTIL# Promise at head of timeout queue has NOT timed out, will time out in [" + j + "] millis - traceId [" + logger2 + "].");
                            }
                        }
                        if (!arrayList.isEmpty()) {
                            break;
                        }
                        long j3 = 0;
                        if (log.isDebugEnabled() && this._nextInLineToTimeout != null) {
                            j3 = System.nanoTime();
                            log.debug("#MATS-UTIL# Will now go to sleep for [" + j + "] millis.");
                        }
                        this._timeouterPing_InternalStateLock.await(j, TimeUnit.MILLISECONDS);
                        if (log.isDebugEnabled() && this._nextInLineToTimeout != null) {
                            double nanoTime = (System.nanoTime() - j3) / 1000000.0d;
                            Logger logger3 = log;
                            double d = nanoTime - j;
                            logger3.debug("#MATS-UTIL# .. slept [" + nanoTime + "] millis (should have slept [" + logger3 + "] millis, difference [" + j + "] millis too much).");
                        }
                    } finally {
                        this._internalStateLock.unlock();
                    }
                }
                int size = arrayList.size();
                if (log.isDebugEnabled()) {
                    log.debug("#MATS-UTIL# Will now timeout [" + size + "] Promise(s).");
                }
                for (Promise promise : arrayList) {
                    this._futureCompleterThreadPool.execute(() -> {
                        try {
                            double round = Math.round((System.nanoTime() - promise._initiationNanos) / 1000.0d) / 1000.0d;
                            MDC.put(MDC_TRACE_ID, promise._traceId);
                            MDC.put(MDC_MATS_INIT_ID, promise._from);
                            MDC.put(MDC_MATS_FUTURE_TIMEOUT, Double.toString(round));
                            String str3 = promise._from;
                            String str4 = promise._traceId;
                            String str5 = promise._to;
                            long j4 = promise._timeoutTimestamp - promise._initiationTimestamp;
                            String str6 = "The Promise/Future timed out! It was initiated from:[" + str3 + "] with traceId:[" + str4 + "], to:[" + str5 + "] Initiation was [" + round + " ms] ago, and its specified timeout was:[" + str3 + "].";
                            log.warn("#MATS-UTIL# " + str6);
                            _timeoutCompleteExceptionally(promise, str6);
                        } catch (Throwable th2) {
                            log.error("#MATS-UTIL# Got problems timing out Promise/Future initiated from:[" + promise._from + "] with traceId:[" + promise._traceId + "], ignoring.", th2);
                        } finally {
                            MDC.clear();
                        }
                    });
                    MDC.clear();
                    if (size > 1 && size < 10) {
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e2) {
                        }
                    }
                }
                arrayList.clear();
            }
            log.info("MatsFuturizer Timeouter-thread: We got asked to exit, and that we do!");
        }, "MatsFuturizer Timeouter").start();
    }

    protected void _timeoutCompleteExceptionally(Promise<?> promise, String str) {
        promise._future.completeExceptionally(new MatsFuturizerTimeoutException(str, promise._initiationTimestamp, promise._traceId));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (!this._runFlag) {
            log.info("MatsFuturizer.close() invoked, but runFlag is already false, thus it has already been closed.");
            return;
        }
        log.info("MatsFuturizer.close() invoked: Shutting down & removing reply-handler-endpoint, shutting down future-completer-threadpool, timeouter-thread, and cancelling any outstanding futures.");
        this._runFlag = false;
        this._replyHandlerEndpoint.remove(5000);
        this._futureCompleterThreadPool.shutdown();
        ArrayList<Promise> arrayList = new ArrayList();
        this._internalStateLock.lock();
        try {
            arrayList.addAll(this._timeoutSortedPromises);
            this._timeoutSortedPromises.clear();
            this._correlationIdToPromiseMap.clear();
            this._timeouterPing_InternalStateLock.signalAll();
            for (Promise promise : arrayList) {
                try {
                    try {
                        MDC.put(MDC_TRACE_ID, promise._traceId);
                        promise._future.cancel(true);
                        MDC.remove(MDC_TRACE_ID);
                    } catch (Throwable th) {
                        log.error("#MATS-UTIL# Got problems cancelling (due to shutdown) Promise/Future initiated from:[" + promise._from + "] with traceId:[" + promise._traceId + "]", th);
                        MDC.remove(MDC_TRACE_ID);
                    }
                } catch (Throwable th2) {
                    MDC.remove(MDC_TRACE_ID);
                    throw th2;
                }
            }
        } finally {
            this._internalStateLock.unlock();
        }
    }
}
