package org.swisspush.redisques.performance;

import io.vertx.core.Vertx;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;

/* loaded from: input_file:org/swisspush/redisques/performance/UpperBoundParallel.class */
public class UpperBoundParallel {
    private static final Logger log;
    private final Vertx vertx;
    private final RedisQuesExceptionFactory exceptionFactory;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/swisspush/redisques/performance/UpperBoundParallel$Mentor.class */
    public interface Mentor<Ctx> {
        boolean runOneMore(BiConsumer<Throwable, Void> biConsumer, Ctx ctx);

        boolean onError(Throwable th, Ctx ctx);

        void onDone(Ctx ctx);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/performance/UpperBoundParallel$Request.class */
    public static final class Request<Ctx> {
        private final Ctx ctx;
        private final Mentor<Ctx> mentor;
        private final Semaphore limit;
        private final Lock lock = new ReentrantLock();
        private Thread worker = null;
        private int numInProgress = 0;
        private int numTokensAvailForOurself = 0;
        private boolean hasMore = true;
        private boolean hasStarted = false;
        private boolean isFatalError = false;
        private boolean isDoneCalled = false;

        private Request(Ctx ctx, Mentor<Ctx> mentor, Semaphore semaphore) {
            this.ctx = ctx;
            this.mentor = mentor;
            this.limit = semaphore;
        }
    }

    public UpperBoundParallel(Vertx vertx, RedisQuesExceptionFactory redisQuesExceptionFactory) {
        if (!$assertionsDisabled && vertx == null) {
            throw new AssertionError();
        }
        this.vertx = vertx;
        this.exceptionFactory = redisQuesExceptionFactory;
    }

    public <Ctx> void request(Semaphore semaphore, Ctx ctx, Mentor<Ctx> mentor) {
        resume(new Request<>(ctx, mentor, semaphore));
    }

    /* JADX WARN: Code restructure failed: missing block: B:149:0x00c0, code lost:
    
        if (((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).numInProgress != 0) goto L47;
     */
    /* JADX WARN: Code restructure failed: missing block: B:151:0x00c7, code lost:
    
        if (((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).isDoneCalled != false) goto L47;
     */
    /* JADX WARN: Code restructure failed: missing block: B:152:0x00ca, code lost:
    
        ((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).isDoneCalled = true;
        ((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).lock.unlock();
        org.swisspush.redisques.performance.UpperBoundParallel.log.trace("call 'mentor.onDone()'");
     */
    /* JADX WARN: Code restructure failed: missing block: B:154:0x00e2, code lost:
    
        ((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).mentor.onDone(((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).ctx);
     */
    /* JADX WARN: Code restructure failed: missing block: B:155:0x00f0, code lost:
    
        ((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).lock.lock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:156:0x0107, code lost:
    
        org.swisspush.redisques.performance.UpperBoundParallel.log.debug("Release remaining {} tokens", java.lang.Integer.valueOf(((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).numTokensAvailForOurself));
        ((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).limit.release(((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).numTokensAvailForOurself);
        ((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).numTokensAvailForOurself = 0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:158:0x0144, code lost:
    
        ((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).worker = null;
        r0 = ((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).lock;
        r0.unlock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:159:0x0151, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:161:0x00fb, code lost:
    
        r9 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:162:0x00fc, code lost:
    
        ((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).lock.lock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:163:0x0106, code lost:
    
        throw r9;
     */
    /* JADX WARN: Code restructure failed: missing block: B:164:0x012b, code lost:
    
        org.swisspush.redisques.performance.UpperBoundParallel.log.trace("return for now (hasMore = {}, numInProgress = {})", java.lang.Boolean.valueOf(((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).hasMore), java.lang.Integer.valueOf(((org.swisspush.redisques.performance.UpperBoundParallel.Request) r7).numInProgress));
     */
    /* JADX WARN: Finally extract failed */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private <Ctx> void resume(final org.swisspush.redisques.performance.UpperBoundParallel.Request<Ctx> r7) {
        /*
            Method dump skipped, instructions count: 1019
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.swisspush.redisques.performance.UpperBoundParallel.resume(org.swisspush.redisques.performance.UpperBoundParallel$Request):void");
    }

    private <Ctx> void onOneDone(Request<Ctx> request, Throwable th) {
        ((Request) request).lock.lock();
        try {
            ((Request) request).numInProgress--;
            ((Request) request).numTokensAvailForOurself++;
            log.trace("onOneDone({})  {} remaining", th != null ? "ex" : "null", Integer.valueOf(((Request) request).numInProgress));
            if (!$assertionsDisabled && ((Request) request).numInProgress < 0) {
                throw new AssertionError(((Request) request).numInProgress + " >= 0  (BTW: mentor MUST call 'onDone' EXACTLY once)");
            }
            if (th != null) {
                try {
                    ((Request) request).lock.unlock();
                    if (log.isDebugEnabled()) {
                        log.debug("mentor.onError({}: {})", th.getClass().getName(), th.getMessage());
                    }
                    boolean z = !((Request) request).mentor.onError(th, ((Request) request).ctx);
                    ((Request) request).lock.lock();
                    ((Request) request).isFatalError = z;
                    if (z) {
                        ((Request) request).numTokensAvailForOurself--;
                        ((Request) request).limit.release();
                    }
                } catch (Throwable th2) {
                    ((Request) request).lock.lock();
                    ((Request) request).isFatalError = true;
                    if (1 != 0) {
                        ((Request) request).numTokensAvailForOurself--;
                        ((Request) request).limit.release();
                    }
                    throw th2;
                }
            }
        } finally {
            ((Request) request).lock.unlock();
            this.vertx.runOnContext(r5 -> {
                resume(request);
            });
        }
    }

    static {
        $assertionsDisabled = !UpperBoundParallel.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(UpperBoundParallel.class);
    }
}
