package org.swisspush.redisques.performance;

import io.vertx.core.Vertx;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/swisspush/redisques/performance/UpperBoundParallel.class */
public class UpperBoundParallel {
    private static final Logger log;
    private static final long RETRY_DELAY_IF_LIMIT_REACHED_MS = 8;
    private final Vertx vertx;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/swisspush/redisques/performance/UpperBoundParallel$Mentor.class */
    public interface Mentor<Ctx> {
        boolean runOneMore(BiConsumer<Throwable, Void> biConsumer, Ctx ctx);

        boolean onError(Throwable th, Ctx ctx);

        void onDone(Ctx ctx);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/performance/UpperBoundParallel$Request.class */
    public final class Request<Ctx> {
        private final Ctx ctx;
        private final Mentor<Ctx> mentor;
        private final Semaphore limit;
        private final Lock lock = new ReentrantLock();
        private Thread worker = null;
        private int numInProgress = 0;
        private boolean hasMore = true;
        private boolean isFatalError = false;
        private boolean isDoneCalled = false;

        private Request(Ctx ctx, Mentor<Ctx> mentor, Semaphore semaphore) {
            this.ctx = ctx;
            this.mentor = mentor;
            this.limit = semaphore;
        }

        public void onOneDone_(Throwable th, Void r6) {
            UpperBoundParallel.this.onOneDone(this, th);
        }
    }

    public UpperBoundParallel(Vertx vertx) {
        if (!$assertionsDisabled && vertx == null) {
            throw new AssertionError();
        }
        this.vertx = vertx;
    }

    public <Ctx> void request(Semaphore semaphore, Ctx ctx, Mentor<Ctx> mentor) {
        resume(new Request<>(ctx, mentor, semaphore));
    }

    /* JADX WARN: Finally extract failed */
    private <Ctx> void resume(Request<Ctx> request) {
        if (!((Request) request).lock.tryLock()) {
            log.trace("Some other thread already working here");
            return;
        }
        try {
            Thread currentThread = Thread.currentThread();
            if (((Request) request).worker == null) {
                log.trace("worker := ourself");
                ((Request) request).worker = currentThread;
            } else if (((Request) request).worker != currentThread) {
                log.trace("Another thread is already working here");
                ((Request) request).worker = null;
                ((Request) request).lock.unlock();
                return;
            }
        } finally {
        }
        while (!((Request) request).isFatalError) {
            if (!((Request) request).hasMore) {
                if (((Request) request).numInProgress != 0 || ((Request) request).isDoneCalled) {
                    log.trace("return for now (hasMore = {}, numInProgress = {})", Boolean.valueOf(((Request) request).hasMore), Integer.valueOf(((Request) request).numInProgress));
                } else {
                    ((Request) request).isDoneCalled = true;
                    ((Request) request).lock.unlock();
                    log.debug("call 'mentor.onDone()'");
                    try {
                        ((Request) request).mentor.onDone(((Request) request).ctx);
                        ((Request) request).lock.lock();
                    } catch (Throwable th) {
                        ((Request) request).lock.lock();
                        throw th;
                    }
                }
                return;
            }
            if (!((Request) request).limit.tryAcquire()) {
                log.debug("redis request limit reached. Need to pause now.");
                if (!$assertionsDisabled && ((Request) request).numInProgress < 0) {
                    throw new AssertionError(((Request) request).numInProgress);
                }
                if (((Request) request).numInProgress == 0) {
                    this.vertx.setTimer(RETRY_DELAY_IF_LIMIT_REACHED_MS, l -> {
                        resume(request);
                    });
                }
                ((Request) request).worker = null;
                ((Request) request).lock.unlock();
                return;
            }
            ((Request) request).numInProgress++;
            boolean z = true;
            try {
                try {
                    ((Request) request).lock.unlock();
                    log.trace("mentor.runOneMore()  numInProgress={}", Integer.valueOf(((Request) request).numInProgress));
                    Mentor<Ctx> mentor = ((Request) request).mentor;
                    Objects.requireNonNull(request);
                    z = mentor.runOneMore(request::onOneDone_, ((Request) request).ctx);
                    log.trace("mentor.runOneMore() -> hasMore={}", Boolean.valueOf(z));
                    ((Request) request).lock.lock();
                    ((Request) request).hasMore = z;
                } catch (RuntimeException e) {
                    onOneDone(request, e);
                    log.trace("mentor.runOneMore() -> hasMore={}", Boolean.valueOf(z));
                    ((Request) request).lock.lock();
                    ((Request) request).hasMore = z;
                }
            } catch (Throwable th2) {
                log.trace("mentor.runOneMore() -> hasMore={}", Boolean.valueOf(z));
                ((Request) request).lock.lock();
                ((Request) request).hasMore = z;
                throw th2;
            }
            ((Request) request).worker = null;
            ((Request) request).lock.unlock();
        }
        log.debug("return from 'resume()' because isFatalError");
        ((Request) request).worker = null;
        ((Request) request).lock.unlock();
    }

    private <Ctx> void onOneDone(Request<Ctx> request, Throwable th) {
        ((Request) request).lock.lock();
        try {
            ((Request) request).limit.release();
            ((Request) request).numInProgress--;
            log.trace("onOneDone({})  {} remaining", th != null ? "ex" : "null", Integer.valueOf(((Request) request).numInProgress));
            if (!$assertionsDisabled && ((Request) request).numInProgress < 0) {
                throw new AssertionError(((Request) request).numInProgress + " >= 0  (BTW: mentor MUST call 'onDone' EXACTLY once)");
            }
            boolean z = true;
            if (th != null) {
                try {
                    ((Request) request).lock.unlock();
                    if (log.isDebugEnabled()) {
                        log.debug("mentor.onError({}: {})", th.getClass().getName(), th.getMessage());
                    }
                    z = ((Request) request).mentor.onError(th, ((Request) request).ctx);
                    ((Request) request).lock.lock();
                    ((Request) request).isFatalError = z;
                } catch (Throwable th2) {
                    ((Request) request).lock.lock();
                    ((Request) request).isFatalError = z;
                    throw th2;
                }
            }
        } finally {
            ((Request) request).lock.unlock();
            this.vertx.runOnContext(r5 -> {
                resume(request);
            });
        }
    }

    static {
        $assertionsDisabled = !UpperBoundParallel.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(UpperBoundParallel.class);
    }
}
