package org.swisspush.redisques;

import com.google.common.base.Strings;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.Response;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.action.QueueAction;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.performance.UpperBoundParallel;
import org.swisspush.redisques.scheduling.PeriodicSkipScheduler;
import org.swisspush.redisques.util.DefaultMemoryUsageProvider;
import org.swisspush.redisques.util.DefaultRedisProvider;
import org.swisspush.redisques.util.DefaultRedisquesConfigurationProvider;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.DequeueStatisticCollector;
import org.swisspush.redisques.util.MemoryUsageProvider;
import org.swisspush.redisques.util.QueueActionFactory;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisMonitor;
import org.swisspush.redisques.util.RedisProvider;
import org.swisspush.redisques.util.RedisQuesTimer;
import org.swisspush.redisques.util.RedisUtils;
import org.swisspush.redisques.util.RedisquesAPI;
import org.swisspush.redisques.util.RedisquesConfiguration;
import org.swisspush.redisques.util.RedisquesConfigurationProvider;

/* loaded from: input_file:org/swisspush/redisques/RedisQues.class */
public class RedisQues extends AbstractVerticle {
    private final String uid;
    private MessageConsumer<String> uidMessageConsumer;
    private UpperBoundParallel upperBoundParallel;
    private final Map<String, QueueState> myQueues;
    private static final Logger log;
    private DequeueStatisticCollector dequeueStatisticCollector;
    private QueueStatisticsCollector queueStatisticsCollector;
    private Handler<Void> stoppedHandler;
    private MessageConsumer<String> consumersMessageConsumer;
    private RedisProvider redisProvider;
    private String queuesKey;
    private String queuesPrefix;
    private String consumersPrefix;
    private String locksKey;
    private String queueCheckLastexecKey;
    private int consumerLockTime;
    private RedisQuesTimer timer;
    private MemoryUsageProvider memoryUsageProvider;
    private QueueActionFactory queueActionFactory;
    private RedisquesConfigurationProvider configurationProvider;
    private RedisMonitor redisMonitor;
    private Map<RedisquesAPI.QueueOperation, QueueAction> queueActions;
    private Map<String, DequeueStatistic> dequeueStatistic;
    private boolean dequeueStatisticEnabled;
    private final RedisQuesExceptionFactory exceptionFactory;
    private PeriodicSkipScheduler periodicSkipScheduler;
    private final Semaphore redisMonitoringReqQuota;
    private final Semaphore checkQueueRequestsQuota;
    private final Semaphore queueStatsRequestQuota;
    private final Semaphore getQueuesItemsCountRedisRequestQuota;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.swisspush.redisques.RedisQues$2, reason: invalid class name */
    /* loaded from: input_file:org/swisspush/redisques/RedisQues$2.class */
    public class AnonymousClass2 implements Consumer<Runnable> {
        Iterator<Map.Entry<String, QueueState>> iter;
        static final /* synthetic */ boolean $assertionsDisabled;

        AnonymousClass2() {
        }

        @Override // java.util.function.Consumer
        public void accept(final Runnable runnable) {
            this.iter = new HashMap(RedisQues.this.myQueues).entrySet().iterator();
            RedisQues.this.upperBoundParallel.request(RedisQues.this.redisMonitoringReqQuota, this.iter, new UpperBoundParallel.Mentor<Iterator<Map.Entry<String, QueueState>>>() { // from class: org.swisspush.redisques.RedisQues.2.1
                /* renamed from: runOneMore, reason: avoid collision after fix types in other method */
                public boolean runOneMore2(BiConsumer<Throwable, Void> biConsumer, Iterator<Map.Entry<String, QueueState>> it) {
                    AnonymousClass2.this.handleNextQueueOfInterest(biConsumer);
                    return it.hasNext();
                }

                @Override // org.swisspush.redisques.performance.UpperBoundParallel.Mentor
                public boolean onError(Throwable th, Iterator<Map.Entry<String, QueueState>> it) {
                    if (!RedisQues.log.isWarnEnabled()) {
                        return false;
                    }
                    RedisQues.log.warn("TODO error handling", RedisQues.this.exceptionFactory.newException(th));
                    return false;
                }

                @Override // org.swisspush.redisques.performance.UpperBoundParallel.Mentor
                public void onDone(Iterator<Map.Entry<String, QueueState>> it) {
                    runnable.run();
                }

                @Override // org.swisspush.redisques.performance.UpperBoundParallel.Mentor
                public /* bridge */ /* synthetic */ boolean runOneMore(BiConsumer biConsumer, Iterator<Map.Entry<String, QueueState>> it) {
                    return runOneMore2((BiConsumer<Throwable, Void>) biConsumer, it);
                }
            });
        }

        void handleNextQueueOfInterest(BiConsumer<Throwable, Void> biConsumer) {
            while (this.iter.hasNext()) {
                Map.Entry<String, QueueState> next = this.iter.next();
                if (next.getValue() == QueueState.CONSUMING) {
                    checkIfImStillTheRegisteredConsumer(next.getKey(), biConsumer);
                    return;
                }
            }
            biConsumer.accept(null, null);
        }

        void checkIfImStillTheRegisteredConsumer(String str, BiConsumer<Throwable, Void> biConsumer) {
            String str2 = RedisQues.this.consumersPrefix + str;
            RedisQues.log.trace("RedisQues refresh queues get: {}", str2);
            RedisQues.this.redisProvider.redis().onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    biConsumer.accept(RedisQues.this.exceptionFactory.newException("redisProvider.redis() failed", asyncResult.cause()), null);
                } else {
                    ((RedisAPI) asyncResult.result()).get(str2, asyncResult -> {
                        if (asyncResult.failed()) {
                            Exception newException = RedisQues.this.exceptionFactory.newException("Failed to get queue consumer for queue '" + str + "'", asyncResult.cause());
                            if (!$assertionsDisabled && newException == null) {
                                throw new AssertionError();
                            }
                            biConsumer.accept(newException, null);
                            return;
                        }
                        if (RedisQues.this.uid.equals(Objects.toString(asyncResult.result(), ""))) {
                            RedisQues.log.debug("RedisQues Periodic consumer refresh for active queue {}", str);
                            RedisQues.this.refreshRegistration(str, asyncResult -> {
                                if (asyncResult.failed()) {
                                    biConsumer.accept(RedisQues.this.exceptionFactory.newException("TODO error handling", asyncResult.cause()), null);
                                } else {
                                    RedisQues.this.updateTimestamp(str, asyncResult -> {
                                        biConsumer.accept(asyncResult.succeeded() ? null : RedisQues.this.exceptionFactory.newException("updateTimestamp(" + str + ") failed", asyncResult.cause()), null);
                                    });
                                }
                            });
                        } else {
                            RedisQues.log.debug("RedisQues Removing queue {} from the list", str);
                            RedisQues.this.myQueues.remove(str);
                            RedisQues.this.queueStatisticsCollector.resetQueueFailureStatistics(str, biConsumer);
                        }
                    });
                }
            });
        }

        static {
            $assertionsDisabled = !RedisQues.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.swisspush.redisques.RedisQues$3, reason: invalid class name */
    /* loaded from: input_file:org/swisspush/redisques/RedisQues$3.class */
    public class AnonymousClass3 {
        long limit;
        RedisAPI redisAPI;
        AtomicInteger counter;
        Iterator<Response> iter;

        AnonymousClass3() {
        }
    }

    /* loaded from: input_file:org/swisspush/redisques/RedisQues$FailedAsyncResult.class */
    private class FailedAsyncResult<Response> implements AsyncResult<Response> {
        private final Throwable cause;

        private FailedAsyncResult(Throwable th) {
            this.cause = th;
        }

        public Response result() {
            return null;
        }

        public Throwable cause() {
            return this.cause;
        }

        public boolean succeeded() {
            return false;
        }

        public boolean failed() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/RedisQues$QueueState.class */
    public enum QueueState {
        READY,
        CONSUMING
    }

    /* loaded from: input_file:org/swisspush/redisques/RedisQues$RedisQuesBuilder.class */
    public static class RedisQuesBuilder {
        private MemoryUsageProvider memoryUsageProvider;
        private RedisquesConfigurationProvider configurationProvider;
        private RedisProvider redisProvider;
        private RedisQuesExceptionFactory exceptionFactory;
        private Semaphore redisMonitoringReqQuota;
        private Semaphore checkQueueRequestsQuota;
        private Semaphore queueStatsRequestQuota;
        private Semaphore getQueuesItemsCountRedisRequestQuota;

        private RedisQuesBuilder() {
        }

        public RedisQuesBuilder withMemoryUsageProvider(MemoryUsageProvider memoryUsageProvider) {
            this.memoryUsageProvider = memoryUsageProvider;
            return this;
        }

        public RedisQuesBuilder withRedisquesRedisquesConfigurationProvider(RedisquesConfigurationProvider redisquesConfigurationProvider) {
            this.configurationProvider = redisquesConfigurationProvider;
            return this;
        }

        public RedisQuesBuilder withRedisProvider(RedisProvider redisProvider) {
            this.redisProvider = redisProvider;
            return this;
        }

        public RedisQuesBuilder withExceptionFactory(RedisQuesExceptionFactory redisQuesExceptionFactory) {
            this.exceptionFactory = redisQuesExceptionFactory;
            return this;
        }

        public RedisQuesBuilder withRedisMonitoringReqQuota(Semaphore semaphore) {
            this.redisMonitoringReqQuota = semaphore;
            return this;
        }

        public RedisQuesBuilder withCheckQueueRequestsQuota(Semaphore semaphore) {
            this.checkQueueRequestsQuota = semaphore;
            return this;
        }

        public RedisQuesBuilder withQueueStatsRequestQuota(Semaphore semaphore) {
            this.queueStatsRequestQuota = semaphore;
            return this;
        }

        public RedisQuesBuilder withGetQueuesItemsCountRedisRequestQuota(Semaphore semaphore) {
            this.getQueuesItemsCountRedisRequestQuota = semaphore;
            return this;
        }

        public RedisQues build() {
            if (this.exceptionFactory == null) {
                this.exceptionFactory = RedisQuesExceptionFactory.newThriftyExceptionFactory();
            }
            if (this.redisMonitoringReqQuota == null) {
                this.redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE);
                RedisQues.log.warn("No redis request limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
            }
            if (this.checkQueueRequestsQuota == null) {
                this.checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE);
                RedisQues.log.warn("No redis check queue limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
            }
            if (this.queueStatsRequestQuota == null) {
                this.queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE);
                RedisQues.log.warn("No redis queue stats limit provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
            }
            if (this.getQueuesItemsCountRedisRequestQuota == null) {
                this.getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE);
                RedisQues.log.warn("No redis getQueueItemsCount quota provided. Fallback to legacy behavior of {}.", Integer.MAX_VALUE);
            }
            return new RedisQues(this.memoryUsageProvider, this.configurationProvider, this.redisProvider, this.exceptionFactory, this.redisMonitoringReqQuota, this.checkQueueRequestsQuota, this.queueStatsRequestQuota, this.getQueuesItemsCountRedisRequestQuota);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/swisspush/redisques/RedisQues$Task.class */
    public class Task {
        private final String queueName;
        private final DequeueStatistic dequeueStatistic;

        Task(String str, DequeueStatistic dequeueStatistic) {
            this.queueName = str;
            this.dequeueStatistic = dequeueStatistic;
        }

        Future<Void> execute() {
            return RedisQues.this.vertx.executeBlocking(promise -> {
                RedisQues.this.dequeueStatisticCollector.setDequeueStatistic(this.queueName, this.dequeueStatistic).onComplete(asyncResult -> {
                    if (asyncResult.failed()) {
                        RedisQues.log.error("Future that should always succeed has failed, ignore it", asyncResult.cause());
                    }
                    promise.complete();
                });
            });
        }
    }

    public RedisQues() {
        this.uid = UUID.randomUUID().toString();
        this.myQueues = new HashMap();
        this.stoppedHandler = null;
        this.queueActions = new HashMap();
        this.dequeueStatistic = new ConcurrentHashMap();
        this.dequeueStatisticEnabled = false;
        this.exceptionFactory = RedisQuesExceptionFactory.newThriftyExceptionFactory();
        log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE);
        this.redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE);
        this.checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE);
        this.queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE);
        this.getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE);
    }

    public RedisQues(MemoryUsageProvider memoryUsageProvider, RedisquesConfigurationProvider redisquesConfigurationProvider, RedisProvider redisProvider, RedisQuesExceptionFactory redisQuesExceptionFactory, Semaphore semaphore, Semaphore semaphore2, Semaphore semaphore3, Semaphore semaphore4) {
        this.uid = UUID.randomUUID().toString();
        this.myQueues = new HashMap();
        this.stoppedHandler = null;
        this.queueActions = new HashMap();
        this.dequeueStatistic = new ConcurrentHashMap();
        this.dequeueStatisticEnabled = false;
        this.memoryUsageProvider = memoryUsageProvider;
        this.configurationProvider = redisquesConfigurationProvider;
        this.redisProvider = redisProvider;
        this.exceptionFactory = redisQuesExceptionFactory;
        this.redisMonitoringReqQuota = semaphore;
        this.checkQueueRequestsQuota = semaphore2;
        this.queueStatsRequestQuota = semaphore3;
        this.getQueuesItemsCountRedisRequestQuota = semaphore4;
    }

    public static RedisQuesBuilder builder() {
        return new RedisQuesBuilder();
    }

    private void redisSetWithOptions(String str, String str2, boolean z, int i, Handler<AsyncResult<Response>> handler) {
        JsonArray jsonArray = new JsonArray();
        jsonArray.add("EX").add(Integer.valueOf(i));
        if (z) {
            jsonArray.add("NX");
        }
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.send(Command.SET, (String[]) RedisUtils.toPayload(str, str2, jsonArray).toArray(new String[0])).onComplete(handler);
        }).onFailure(th -> {
            handler.handle(new FailedAsyncResult(th));
        });
    }

    private void handleRegistrationRequest(Message<String> message) {
        String str = (String) message.body();
        if (str == null) {
            log.warn("Got message without queue name while handleRegistrationRequest.");
        }
        log.debug("RedisQues Got registration request for queue {} from consumer: {}", str, this.uid);
        redisSetWithOptions(this.consumersPrefix + str, this.uid, true, this.consumerLockTime, asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.error("redisSetWithOptions failed", asyncResult.cause());
                return;
            }
            String response = asyncResult.result() != null ? ((Response) asyncResult.result()).toString() : null;
            log.trace("RedisQues setxn result: {} for queue: {}", response, str);
            if (!"OK".equals(response)) {
                log.debug("RedisQues Missed registration for queue {}", str);
                return;
            }
            log.debug("RedisQues Now registered for queue {}", str);
            this.myQueues.put(str, QueueState.READY);
            consume(str);
        });
    }

    public void start(Promise<Void> promise) {
        log.info("Started with UID {}", this.uid);
        if (this.configurationProvider == null) {
            this.configurationProvider = new DefaultRedisquesConfigurationProvider(this.vertx, config());
        }
        if (this.dequeueStatisticCollector == null) {
            this.dequeueStatisticCollector = new DequeueStatisticCollector(this.vertx);
        }
        if (this.periodicSkipScheduler == null) {
            this.periodicSkipScheduler = new PeriodicSkipScheduler(this.vertx);
        }
        RedisquesConfiguration configuration = this.configurationProvider.configuration();
        log.info("Starting Redisques module with configuration: {}", this.configurationProvider.configuration());
        int dequeueStatisticReportIntervalSec = configuration.getDequeueStatisticReportIntervalSec();
        if (dequeueStatisticReportIntervalSec > 0) {
            this.dequeueStatisticEnabled = true;
            Runnable newDequeueStatisticPublisher = newDequeueStatisticPublisher();
            this.vertx.setPeriodic(1000 * dequeueStatisticReportIntervalSec, l -> {
                newDequeueStatisticPublisher.run();
            });
        }
        this.queuesKey = configuration.getRedisPrefix() + "queues";
        this.queuesPrefix = configuration.getRedisPrefix() + "queues:";
        this.consumersPrefix = configuration.getRedisPrefix() + "consumers:";
        this.locksKey = configuration.getRedisPrefix() + "locks";
        this.queueCheckLastexecKey = configuration.getRedisPrefix() + "check:lastexec";
        this.consumerLockTime = 2 * configuration.getRefreshPeriod();
        this.timer = new RedisQuesTimer(this.vertx);
        if (this.redisProvider == null) {
            this.redisProvider = new DefaultRedisProvider(this.vertx, this.configurationProvider);
        }
        this.upperBoundParallel = new UpperBoundParallel(this.vertx);
        this.redisProvider.redis().onComplete(asyncResult -> {
            if (!asyncResult.succeeded()) {
                promise.fail(new Exception(asyncResult.cause()));
            } else {
                initialize();
                promise.complete();
            }
        });
    }

    private void initialize() {
        RedisquesConfiguration configuration = this.configurationProvider.configuration();
        this.queueStatisticsCollector = new QueueStatisticsCollector(this.redisProvider, this.queuesPrefix, this.vertx, this.exceptionFactory, this.redisMonitoringReqQuota, configuration.getQueueSpeedIntervalSec());
        RedisquesHttpRequestHandler.init(this.vertx, configuration, this.queueStatisticsCollector, this.dequeueStatisticCollector, this.exceptionFactory, this.queueStatsRequestQuota);
        if (this.memoryUsageProvider == null) {
            this.memoryUsageProvider = new DefaultMemoryUsageProvider(this.redisProvider, this.vertx, this.configurationProvider.configuration().getMemoryUsageCheckIntervalSec());
        }
        if (!$assertionsDisabled && this.getQueuesItemsCountRedisRequestQuota == null) {
            throw new AssertionError();
        }
        this.queueActionFactory = new QueueActionFactory(this.redisProvider, this.vertx, log, this.queuesKey, this.queuesPrefix, this.consumersPrefix, this.locksKey, this.memoryUsageProvider, this.queueStatisticsCollector, this.exceptionFactory, this.configurationProvider, this.getQueuesItemsCountRedisRequestQuota);
        this.queueActions.put(RedisquesAPI.QueueOperation.addQueueItem, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.addQueueItem));
        this.queueActions.put(RedisquesAPI.QueueOperation.deleteQueueItem, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.deleteQueueItem));
        this.queueActions.put(RedisquesAPI.QueueOperation.deleteAllQueueItems, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.deleteAllQueueItems));
        this.queueActions.put(RedisquesAPI.QueueOperation.bulkDeleteQueues, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.bulkDeleteQueues));
        this.queueActions.put(RedisquesAPI.QueueOperation.replaceQueueItem, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.replaceQueueItem));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueueItem, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueueItem));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueueItems, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueueItems));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueues, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueues));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueuesCount, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueuesCount));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueueItemsCount, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueueItemsCount));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueuesItemsCount, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueuesItemsCount));
        this.queueActions.put(RedisquesAPI.QueueOperation.enqueue, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.enqueue));
        this.queueActions.put(RedisquesAPI.QueueOperation.lockedEnqueue, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.lockedEnqueue));
        this.queueActions.put(RedisquesAPI.QueueOperation.getLock, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getLock));
        this.queueActions.put(RedisquesAPI.QueueOperation.putLock, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.putLock));
        this.queueActions.put(RedisquesAPI.QueueOperation.bulkPutLocks, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.bulkPutLocks));
        this.queueActions.put(RedisquesAPI.QueueOperation.getAllLocks, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getAllLocks));
        this.queueActions.put(RedisquesAPI.QueueOperation.deleteLock, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.deleteLock));
        this.queueActions.put(RedisquesAPI.QueueOperation.bulkDeleteLocks, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.bulkDeleteLocks));
        this.queueActions.put(RedisquesAPI.QueueOperation.deleteAllLocks, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.deleteAllLocks));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueuesSpeed, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueuesSpeed));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueuesStatistics, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueuesStatistics));
        this.queueActions.put(RedisquesAPI.QueueOperation.setConfiguration, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.setConfiguration));
        this.queueActions.put(RedisquesAPI.QueueOperation.getConfiguration, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getConfiguration));
        String address = configuration.getAddress();
        this.vertx.eventBus().consumer(address, operationsHandler());
        this.consumersMessageConsumer = this.vertx.eventBus().consumer(address + "-consumers", this::handleRegistrationRequest);
        this.uidMessageConsumer = this.vertx.eventBus().consumer(this.uid, message -> {
            String str = (String) message.body();
            if (str == null) {
                log.warn("Got event bus msg with empty body! uid={}  address={}  replyAddress={}", new Object[]{this.uid, message.address(), message.replyAddress()});
            }
            log.debug("RedisQues got notification for queue '{}'", str);
            consume(str);
        });
        registerActiveQueueRegistrationRefresh();
        registerQueueCheck();
        registerMetricsGathering(configuration);
    }

    private void registerMetricsGathering(RedisquesConfiguration redisquesConfiguration) {
        String publishMetricsAddress = redisquesConfiguration.getPublishMetricsAddress();
        if (Strings.isNullOrEmpty(publishMetricsAddress)) {
            return;
        }
        this.redisMonitor = new RedisMonitor(this.vertx, this.redisProvider, publishMetricsAddress, redisquesConfiguration.getMetricStorageName(), redisquesConfiguration.getMetricRefreshPeriod());
        this.redisMonitor.start();
    }

    private Runnable newDequeueStatisticPublisher() {
        return new Runnable() { // from class: org.swisspush.redisques.RedisQues.1
            Iterator<Map.Entry<String, DequeueStatistic>> iter;
            long startEpochMs;
            int size;
            final AtomicBoolean isRunning = new AtomicBoolean();
            AtomicInteger i = new AtomicInteger();

            @Override // java.lang.Runnable
            public void run() {
                if (!this.isRunning.compareAndSet(false, true)) {
                    RedisQues.log.warn("Previous publish run still in progress at idx {} of {} since {}ms", new Object[]{this.i, Integer.valueOf(this.size), Long.valueOf(System.currentTimeMillis() - this.startEpochMs)});
                    return;
                }
                try {
                    HashMap hashMap = new HashMap(RedisQues.this.dequeueStatistic);
                    this.size = hashMap.size();
                    this.iter = hashMap.entrySet().iterator();
                    hashMap.forEach((str, dequeueStatistic) -> {
                        if (dequeueStatistic.isMarkedForRemoval()) {
                            RedisQues.this.dequeueStatistic.remove(str);
                        }
                    });
                    this.i.set(0);
                    this.startEpochMs = System.currentTimeMillis();
                    if (this.size > 5000) {
                        RedisQues.log.warn("Going to report {} dequeue statistics towards collector", Integer.valueOf(this.size));
                    } else if (this.size > 500) {
                        RedisQues.log.info("Going to report {} dequeue statistics towards collector", Integer.valueOf(this.size));
                    } else {
                        RedisQues.log.debug("Going to report {} dequeue statistics towards collector", Integer.valueOf(this.size));
                    }
                    resume();
                } catch (Throwable th) {
                    this.isRunning.set(false);
                    throw th;
                }
            }

            void resume() {
                try {
                    ArrayList arrayList = new ArrayList();
                    while (this.iter.hasNext()) {
                        Map.Entry<String, DequeueStatistic> next = this.iter.next();
                        arrayList.add(new Task(next.getKey(), next.getValue()));
                    }
                    ((Future) arrayList.stream().reduce(Future.succeededFuture(new ArrayList()), (future, task) -> {
                        return future.compose(list -> {
                            return task.execute().compose(r5 -> {
                                list.add(r5);
                                this.i.incrementAndGet();
                                return Future.succeededFuture(list);
                            });
                        });
                    }, (future2, future3) -> {
                        return Future.succeededFuture();
                    })).onComplete(asyncResult -> {
                        if (asyncResult.failed()) {
                            RedisQues.log.error("publishing dequeue statistics not complete, just continue", asyncResult.cause());
                        }
                        RedisQues.log.debug("Done publishing {} dequeue statistics. Took {}ms", this.i, Long.valueOf(System.currentTimeMillis() - this.startEpochMs));
                        this.isRunning.set(false);
                    });
                } catch (Throwable th) {
                    this.isRunning.set(false);
                    throw th;
                }
            }
        };
    }

    private void registerActiveQueueRegistrationRefresh() {
        this.periodicSkipScheduler.setPeriodic(this.configurationProvider.configuration().getRefreshPeriod() * 1000, "registerActiveQueueRegistrationRefresh", new AnonymousClass2());
    }

    private Handler<Message<JsonObject>> operationsHandler() {
        return message -> {
            JsonObject jsonObject = (JsonObject) message.body();
            if (jsonObject == null) {
                throw new NullPointerException("Why is body empty? addr=" + message.address() + "  replyAddr=" + message.replyAddress());
            }
            String string = jsonObject.getString(RedisquesAPI.OPERATION);
            log.trace("RedisQues got operation: {}", string);
            RedisquesAPI.QueueOperation fromString = RedisquesAPI.QueueOperation.fromString(string);
            if (fromString == null) {
                unsupportedOperation(string, message);
                return;
            }
            switch (fromString) {
                case check:
                    checkQueues().onFailure(th -> {
                        if (log.isWarnEnabled()) {
                            log.warn("TODO error handling", this.exceptionFactory.newException(th));
                        }
                    });
                    return;
                case reset:
                    resetConsumers();
                    return;
                case stop:
                    gracefulStop(r1 -> {
                    });
                    return;
                default:
                    this.queueActions.getOrDefault(fromString, this.queueActionFactory.buildUnsupportedAction()).execute(message);
                    return;
            }
        };
    }

    int updateQueueFailureCountAndGetRetryInterval(String str, boolean z) {
        int[] retryIntervals;
        if (z) {
            this.queueStatisticsCollector.queueMessageSuccess(str, (th, r5) -> {
                if (th != null) {
                    log.warn("TODO_3q98hq3 error handling", th);
                }
            });
            return 0;
        }
        long queueMessageFailed = this.queueStatisticsCollector.queueMessageFailed(str);
        QueueConfiguration findQueueConfiguration = findQueueConfiguration(str);
        if (findQueueConfiguration == null || (retryIntervals = findQueueConfiguration.getRetryIntervals()) == null || retryIntervals.length <= 0) {
            return this.configurationProvider.configuration().getRefreshPeriod();
        }
        int i = retryIntervals[(int) (queueMessageFailed <= ((long) retryIntervals.length) ? queueMessageFailed - 1 : retryIntervals.length - 1)];
        this.queueStatisticsCollector.setQueueSlowDownTime(str, i);
        return i;
    }

    private void registerQueueCheck() {
        this.vertx.setPeriodic(this.configurationProvider.configuration().getCheckIntervalTimerMs(), l -> {
            this.redisProvider.redis().compose(redisAPI -> {
                return redisAPI.send(Command.SET, new String[]{this.queueCheckLastexecKey, String.valueOf(System.currentTimeMillis()), "NX", "EX", String.valueOf(this.configurationProvider.configuration().getCheckInterval())});
            }).compose(response -> {
                log.info("periodic queue check is triggered now");
                return checkQueues();
            }).onFailure(th -> {
                if (log.isErrorEnabled()) {
                    log.error("TODO error handling", this.exceptionFactory.newException(th));
                }
            });
        });
    }

    private void unsupportedOperation(String str, Message<JsonObject> message) {
        JsonObject jsonObject = new JsonObject();
        String str2 = "QUEUE_ERROR: Unsupported operation received: " + str;
        log.error(str2);
        jsonObject.put(RedisquesAPI.STATUS, RedisquesAPI.ERROR);
        jsonObject.put(RedisquesAPI.MESSAGE, str2);
        message.reply(jsonObject);
    }

    public void stop() {
        unregisterConsumers(true);
        if (this.redisMonitor != null) {
            this.redisMonitor.stop();
            this.redisMonitor = null;
        }
    }

    private void gracefulStop(Handler<Void> handler) {
        this.consumersMessageConsumer.unregister(asyncResult -> {
            this.uidMessageConsumer.unregister(asyncResult -> {
                if (asyncResult.failed()) {
                    log.warn("TODO error handling", this.exceptionFactory.newException("unregister(" + asyncResult + ") failed", asyncResult.cause()));
                }
                unregisterConsumers(false).onComplete(asyncResult -> {
                    if (asyncResult.failed()) {
                        log.warn("TODO error handling", this.exceptionFactory.newException("unregisterConsumers() failed", asyncResult.cause()));
                    }
                    this.stoppedHandler = handler;
                    if (this.myQueues.keySet().isEmpty()) {
                        handler.handle((Object) null);
                    }
                });
            });
        });
    }

    private Future<Void> unregisterConsumers(boolean z) {
        Promise promise = Promise.promise();
        log.debug("RedisQues unregister consumers. force={}", Boolean.valueOf(z));
        ArrayList arrayList = new ArrayList(this.myQueues.size());
        for (Map.Entry<String, QueueState> entry : this.myQueues.entrySet()) {
            Promise promise2 = Promise.promise();
            arrayList.add(promise2.future());
            String key = entry.getKey();
            if (z || entry.getValue() == QueueState.READY) {
                log.trace("RedisQues unregister consumers queue: {}", key);
                refreshRegistration(key, asyncResult -> {
                    if (asyncResult.failed()) {
                        log.warn("TODO error handling", this.exceptionFactory.newException("refreshRegistration(" + key + ") failed", asyncResult.cause()));
                    }
                    String str = this.consumersPrefix + key;
                    log.trace("RedisQues unregister consumers get: {}", str);
                    this.redisProvider.redis().onSuccess(redisAPI -> {
                        redisAPI.get(str, asyncResult -> {
                            if (asyncResult.failed()) {
                                log.warn("Failed to retrieve consumer '{}'.", str, asyncResult.cause());
                            }
                            String objects = Objects.toString(asyncResult.result(), "");
                            log.trace("RedisQues unregister consumers get result: {}", objects);
                            if (this.uid.equals(objects)) {
                                log.debug("RedisQues remove consumer: {}", this.uid);
                                this.myQueues.remove(key);
                            }
                            promise2.complete();
                        });
                    }).onFailure(th -> {
                        log.warn("Failed to retrieve consumer '{}'.", str, th);
                        promise2.complete();
                    });
                });
            } else {
                promise2.complete();
            }
        }
        CompositeFuture.all(arrayList).onComplete(asyncResult2 -> {
            if (asyncResult2.failed()) {
                log.warn("TODO error handling", this.exceptionFactory.newException(asyncResult2.cause()));
            }
            promise.complete();
        });
        return promise.future();
    }

    private void resetConsumers() {
        log.debug("RedisQues Resetting consumers");
        String str = this.consumersPrefix + "*";
        log.trace("RedisQues reset consumers keys: {}", str);
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.keys(str, asyncResult -> {
                if (asyncResult.failed() || asyncResult.result() == null) {
                    log.error("Unable to get redis keys of consumers", asyncResult.cause());
                    return;
                }
                Response response = (Response) asyncResult.result();
                if (response == null || response.size() == 0) {
                    log.debug("No consumers found to reset");
                    return;
                }
                ArrayList arrayList = new ArrayList(response.size());
                Iterator it = response.iterator();
                while (it.hasNext()) {
                    arrayList.add(((Response) it.next()).toString());
                }
                redisAPI.del(arrayList, asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        log.error("Unable to delete redis keys of consumers");
                    } else if (log.isDebugEnabled()) {
                        log.debug("Successfully reset {} consumers", ((Response) asyncResult.result()).toLong());
                    }
                });
            });
        }).onFailure(th -> {
            log.error("Redis: Unable to get redis keys of consumers", th);
        });
    }

    private Future<Void> consume(String str) {
        Promise promise = Promise.promise();
        log.debug("RedisQues Requested to consume queue {}", str);
        refreshRegistration(str, asyncResult -> {
            if (asyncResult.failed()) {
                log.warn("Failed to refresh registration for queue '{}'.", str, asyncResult.cause());
            }
            String str2 = this.consumersPrefix + str;
            log.trace("RedisQues consume get: {}", str2);
            this.redisProvider.redis().onSuccess(redisAPI -> {
                redisAPI.get(str2, asyncResult -> {
                    if (asyncResult.failed()) {
                        log.error("Unable to get consumer for queue " + str, asyncResult.cause());
                        return;
                    }
                    String objects = Objects.toString(asyncResult.result(), "");
                    log.trace("RedisQues refresh registration consumer: {}", objects);
                    if (!this.uid.equals(objects)) {
                        log.debug("Registration for queue {} has changed to {}", str, objects);
                        this.myQueues.remove(str);
                        notifyConsumer(str).onComplete(asyncResult -> {
                            if (asyncResult.failed()) {
                                log.warn("TODO error handling", this.exceptionFactory.newException("notifyConsumer(" + str + ") failed", asyncResult.cause()));
                            }
                            promise.complete();
                        });
                        return;
                    }
                    QueueState queueState = this.myQueues.get(str);
                    log.trace("RedisQues consumer: {} queue: {} state: {}", new Object[]{objects, str, queueState});
                    if (queueState == QueueState.CONSUMING) {
                        log.debug("RedisQues Queue {} is already being consumed", str);
                        promise.complete();
                        return;
                    }
                    this.myQueues.put(str, QueueState.CONSUMING);
                    if (queueState == null) {
                        log.warn("Received request to consume from a queue I did not know about: {}", str);
                    }
                    log.debug("RedisQues Starting to consume queue {}", str);
                    readQueue(str).onComplete(asyncResult2 -> {
                        if (asyncResult2.failed()) {
                            log.warn("TODO error handling", this.exceptionFactory.newException("readQueue(" + str + ") failed", asyncResult2.cause()));
                        }
                        promise.complete();
                    });
                });
            }).onFailure(th -> {
                log.error("Redis: Unable to get consumer for queue " + str, th);
            });
        });
        return promise.future();
    }

    private Future<Boolean> isQueueLocked(String str) {
        Promise promise = Promise.promise();
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.hexists(this.locksKey, str, asyncResult -> {
                if (asyncResult.failed()) {
                    log.warn("Failed to check if queue '{}' is locked. Assume no.", str, asyncResult.cause());
                    promise.complete(Boolean.FALSE);
                } else if (asyncResult.result() == null) {
                    promise.complete(Boolean.FALSE);
                } else {
                    promise.complete(Boolean.valueOf(((Response) asyncResult.result()).toInteger().intValue() == 1));
                }
            });
        }).onFailure(th -> {
            log.warn("Redis: Failed to check if queue '{}' is locked. Assume no.", str, th);
            promise.complete(Boolean.FALSE);
        });
        return promise.future();
    }

    private Future<Void> readQueue(String str) {
        Promise promise = Promise.promise();
        log.trace("RedisQues read queue: {}", str);
        String str2 = this.queuesPrefix + str;
        log.trace("RedisQues read queue lindex: {}", str2);
        isQueueLocked(str).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                throw this.exceptionFactory.newRuntimeException("TODO error handling " + str, asyncResult.cause());
            }
            if (!((Boolean) asyncResult.result()).booleanValue()) {
                this.redisProvider.redis().onSuccess(redisAPI -> {
                    redisAPI.lindex(str2, "0", asyncResult -> {
                        if (asyncResult.failed()) {
                            log.error("Failed to peek queue '{}'", str, asyncResult.cause());
                        }
                        Response response = (Response) asyncResult.result();
                        log.trace("RedisQues read queue lindex result: {}", response);
                        if (response != null) {
                            if (this.dequeueStatisticEnabled) {
                                this.dequeueStatistic.computeIfAbsent(str, str3 -> {
                                    return new DequeueStatistic();
                                });
                                this.dequeueStatistic.get(str).setLastDequeueAttemptTimestamp(Long.valueOf(System.currentTimeMillis()));
                            }
                            processMessageWithTimeout(str, response.toString(), bool -> {
                                int updateQueueFailureCountAndGetRetryInterval = updateQueueFailureCountAndGetRetryInterval(str, bool.booleanValue());
                                if (bool.booleanValue()) {
                                    log.trace("RedisQues read queue lpop: {}", str2);
                                    redisAPI.lpop(Collections.singletonList(str2), asyncResult -> {
                                        if (asyncResult.failed()) {
                                            log.error("Failed to pop from queue '{}'", str, asyncResult.cause());
                                        }
                                        log.debug("RedisQues Message removed, queue {} is ready again", str);
                                        this.myQueues.put(str, QueueState.READY);
                                        Handler handler = r12 -> {
                                            log.trace("RedisQues read queue: {}", str2);
                                            redisAPI.llen(str2, asyncResult -> {
                                                if (asyncResult.succeeded() && asyncResult.result() != null && ((Response) asyncResult.result()).toInteger().intValue() > 0) {
                                                    notifyConsumer(str).onComplete(asyncResult -> {
                                                        if (asyncResult.failed()) {
                                                            log.warn("TODO error handling", this.exceptionFactory.newException("notifyConsumer(" + str + ") failed", asyncResult.cause()));
                                                        }
                                                        promise.complete();
                                                    });
                                                    return;
                                                }
                                                if (asyncResult.failed() && log.isWarnEnabled()) {
                                                    log.warn("TODO error handling", this.exceptionFactory.newException("redisAPI.llen(" + str2 + ") failed", asyncResult.cause()));
                                                }
                                                promise.complete();
                                            });
                                        };
                                        if (this.stoppedHandler != null) {
                                            unregisterConsumers(false).onComplete(asyncResult -> {
                                                if (asyncResult.failed()) {
                                                    log.warn("TODO error handling", this.exceptionFactory.newException("unregisterConsumers() failed", asyncResult.cause()));
                                                }
                                                if (this.myQueues.isEmpty()) {
                                                    this.stoppedHandler.handle((Object) null);
                                                }
                                                handler.handle((Object) null);
                                            });
                                        } else {
                                            handler.handle((Object) null);
                                        }
                                    });
                                } else {
                                    log.debug("RedisQues Processing failed for queue {}", str);
                                    log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", str, Integer.valueOf(updateQueueFailureCountAndGetRetryInterval));
                                    rescheduleSendMessageAfterFailure(str, updateQueueFailureCountAndGetRetryInterval);
                                    promise.complete();
                                }
                            });
                            return;
                        }
                        log.debug("Got a request to consume from empty queue {}", str);
                        this.myQueues.put(str, QueueState.READY);
                        if (this.dequeueStatisticEnabled) {
                            this.dequeueStatistic.computeIfPresent(str, (str4, dequeueStatistic) -> {
                                dequeueStatistic.setMarkedForRemoval();
                                return dequeueStatistic;
                            });
                        }
                        promise.complete();
                    });
                }).onFailure(th -> {
                    log.warn("Redis: Error on readQueue", th);
                    this.myQueues.put(str, QueueState.READY);
                    promise.complete();
                });
                return;
            }
            log.debug("Got a request to consume from locked queue {}", str);
            this.myQueues.put(str, QueueState.READY);
            promise.complete();
        });
        return promise.future();
    }

    private void rescheduleSendMessageAfterFailure(String str, int i) {
        log.trace("RedsQues reschedule after failure for queue: {}", str);
        this.vertx.setTimer(i * 1000, l -> {
            if (this.dequeueStatisticEnabled) {
                this.dequeueStatistic.computeIfPresent(str, (str2, dequeueStatistic) -> {
                    dequeueStatistic.setNextDequeueDueTimestamp(Long.valueOf(System.currentTimeMillis() + (i * 1000)));
                    return dequeueStatistic;
                });
            }
            if (log.isDebugEnabled()) {
                log.debug("RedisQues re-notify the consumer of queue '{}' at {}", str, new Date(System.currentTimeMillis()));
            }
            notifyConsumer(str).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    log.warn("TODO error handling", this.exceptionFactory.newException("notifyConsumer(" + str + ") failed", asyncResult.cause()));
                }
                this.myQueues.put(str, QueueState.READY);
            });
        });
    }

    private void processMessageWithTimeout(String str, String str2, Handler<Boolean> handler) {
        long processorDelayMax = this.configurationProvider.configuration().getProcessorDelayMax();
        if (processorDelayMax > 0) {
            log.info("About to process message for queue {} with a maximum delay of {}ms", str, Long.valueOf(processorDelayMax));
        }
        this.timer.executeDelayedMax(processorDelayMax).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                log.error("Delayed execution has failed.", this.exceptionFactory.newException(asyncResult.cause()));
                return;
            }
            String processorAddress = this.configurationProvider.configuration().getProcessorAddress();
            EventBus eventBus = this.vertx.eventBus();
            JsonObject jsonObject = new JsonObject();
            jsonObject.put("queue", str);
            jsonObject.put(RedisquesAPI.PAYLOAD, str2);
            log.trace("RedisQues process message: {} for queue: {} send it to processor: {}", new Object[]{jsonObject, str, processorAddress});
            eventBus.request(processorAddress, jsonObject, new DeliveryOptions().setSendTimeout(this.configurationProvider.configuration().getProcessorTimeout()), asyncResult -> {
                boolean booleanValue;
                if (asyncResult.succeeded()) {
                    booleanValue = RedisquesAPI.OK.equals(((JsonObject) ((Message) asyncResult.result()).body()).getString(RedisquesAPI.STATUS));
                    if (booleanValue && this.dequeueStatisticEnabled) {
                        this.dequeueStatistic.computeIfPresent(str, (str3, dequeueStatistic) -> {
                            dequeueStatistic.setLastDequeueSuccessTimestamp(Long.valueOf(System.currentTimeMillis()));
                            dequeueStatistic.setNextDequeueDueTimestamp(null);
                            return dequeueStatistic;
                        });
                    }
                } else {
                    log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {}", new Object[]{this.uid, str, this.exceptionFactory.newException(asyncResult.cause())});
                    booleanValue = Boolean.FALSE.booleanValue();
                }
                handler.handle(Boolean.valueOf(booleanValue));
            });
            updateTimestamp(str, null);
        });
    }

    private Future<Void> notifyConsumer(String str) {
        log.debug("RedisQues Notifying consumer of queue {}", str);
        EventBus eventBus = this.vertx.eventBus();
        Promise promise = Promise.promise();
        String str2 = this.consumersPrefix + str;
        log.trace("RedisQues notify consumer get: {}", str2);
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.get(str2, asyncResult -> {
                if (asyncResult.failed()) {
                    log.warn("Failed to get consumer for queue '{}'", str, new Exception(asyncResult.cause()));
                }
                String objects = Objects.toString(asyncResult.result(), null);
                log.trace("RedisQues got consumer: {}", objects);
                if (objects == null) {
                    log.debug("RedisQues Sending registration request for queue {}", str);
                    eventBus.send(this.configurationProvider.configuration().getAddress() + "-consumers", str);
                    promise.complete();
                } else {
                    log.debug("RedisQues Notifying consumer {} to consume queue {}", objects, str);
                    eventBus.send(objects, str);
                    promise.complete();
                }
            });
        }).onFailure(th -> {
            log.warn("Redis: Failed to get consumer for queue '{}'", str, th);
            promise.complete();
        });
        return promise.future();
    }

    private void refreshRegistration(String str, Handler<AsyncResult<Response>> handler) {
        log.debug("RedisQues Refreshing registration of queue {}, expire in {} s", str, Integer.valueOf(this.consumerLockTime));
        String str2 = this.consumersPrefix + str;
        if (handler == null) {
            throw new RuntimeException("Handler must be set");
        }
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.expire(List.of(str2, String.valueOf(this.consumerLockTime)), handler);
        }).onFailure(th -> {
            log.warn("Redis: Failed to refresh registration of queue {}", str, th);
            handler.handle(new FailedAsyncResult(th));
        });
    }

    private void updateTimestamp(String str, Handler<AsyncResult<Response>> handler) {
        long currentTimeMillis = System.currentTimeMillis();
        log.trace("RedisQues update timestamp for queue: {} to: {}", str, Long.valueOf(currentTimeMillis));
        this.redisProvider.redis().onSuccess(redisAPI -> {
            if (handler == null) {
                redisAPI.zadd(Arrays.asList(this.queuesKey, String.valueOf(currentTimeMillis), str));
            } else {
                redisAPI.zadd(Arrays.asList(this.queuesKey, String.valueOf(currentTimeMillis), str), handler);
            }
        }).onFailure(th -> {
            log.warn("Redis: Error in updateTimestamp", th);
            if (handler != null) {
                handler.handle(new FailedAsyncResult(th));
            }
        });
    }

    private Future<Void> checkQueues() {
        AnonymousClass3 anonymousClass3 = new AnonymousClass3();
        return Future.succeededFuture().compose(r10 -> {
            log.debug("Checking queues timestamps");
            anonymousClass3.limit = System.currentTimeMillis() - ((3 * this.configurationProvider.configuration().getRefreshPeriod()) * 1000);
            return this.redisProvider.redis();
        }).compose(redisAPI -> {
            anonymousClass3.redisAPI = redisAPI;
            Promise promise = Promise.promise();
            redisAPI.zrangebyscore(Arrays.asList(this.queuesKey, "-inf", String.valueOf(anonymousClass3.limit)), promise);
            return promise.future();
        }).compose(response -> {
            if (!$assertionsDisabled && anonymousClass3.counter != null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && anonymousClass3.iter != null) {
                throw new AssertionError();
            }
            anonymousClass3.counter = new AtomicInteger(response.size());
            anonymousClass3.iter = response.iterator();
            log.trace("RedisQues update queues: {}", anonymousClass3.counter);
            final Promise promise = Promise.promise();
            this.upperBoundParallel.request(this.checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor<Void>() { // from class: org.swisspush.redisques.RedisQues.4
                /* renamed from: runOneMore, reason: avoid collision after fix types in other method */
                public boolean runOneMore2(BiConsumer<Throwable, Void> biConsumer, Void r11) {
                    if (anonymousClass3.iter.hasNext()) {
                        String response = anonymousClass3.iter.next().toString();
                        String str = RedisQues.this.queuesPrefix + response;
                        RedisQues.log.trace("RedisQues update queue: {}", str);
                        Handler handler = r9 -> {
                            RedisQues.this.refreshRegistration(response, asyncResult -> {
                                if (asyncResult.failed()) {
                                    RedisQues.log.warn("TODO error handling", RedisQues.this.exceptionFactory.newException("refreshRegistration(" + response + ") failed", asyncResult.cause()));
                                }
                                RedisQues.this.notifyConsumer(response).onComplete(asyncResult -> {
                                    if (asyncResult.failed()) {
                                        RedisQues.log.warn("TODO error handling", RedisQues.this.exceptionFactory.newException("notifyConsumer(" + response + ") failed", asyncResult.cause()));
                                    }
                                    biConsumer.accept(null, null);
                                });
                            });
                        };
                        RedisAPI redisAPI2 = anonymousClass3.redisAPI;
                        List singletonList = Collections.singletonList(str);
                        AnonymousClass3 anonymousClass32 = anonymousClass3;
                        redisAPI2.exists(singletonList, asyncResult -> {
                            if (asyncResult.failed() || asyncResult.result() == null) {
                                RedisQues.log.error("RedisQues is unable to check existence of queue " + response, RedisQues.this.exceptionFactory.newException("redisAPI.exists(" + str + ") failed", asyncResult.cause()));
                                biConsumer.accept(null, null);
                                return;
                            }
                            if (((Response) asyncResult.result()).toLong().longValue() == 1) {
                                RedisQues.log.debug("Updating queue timestamp for queue '{}'", response);
                                RedisQues.this.updateTimestamp(response, asyncResult -> {
                                    if (asyncResult.failed()) {
                                        RedisQues.log.warn("Failed to update timestamps for queue '{}'", response, RedisQues.this.exceptionFactory.newException("updateTimestamp(" + response + ") failed", asyncResult.cause()));
                                    } else if (anonymousClass32.counter.decrementAndGet() == 0) {
                                        RedisQues.this.removeOldQueues(anonymousClass32.limit).onComplete(asyncResult -> {
                                            if (asyncResult.failed() && RedisQues.log.isWarnEnabled()) {
                                                RedisQues.log.warn("TODO error handling", RedisQues.this.exceptionFactory.newException("removeOldQueues(" + anonymousClass32.limit + ") failed", asyncResult.cause()));
                                            }
                                            handler.handle((Object) null);
                                        });
                                    } else {
                                        handler.handle((Object) null);
                                    }
                                });
                                return;
                            }
                            if (RedisQues.log.isTraceEnabled()) {
                                RedisQues.log.trace("RedisQues remove old queue: {}", response);
                            }
                            if (RedisQues.this.dequeueStatisticEnabled) {
                                RedisQues.this.dequeueStatistic.computeIfPresent(response, (str2, dequeueStatistic) -> {
                                    dequeueStatistic.setMarkedForRemoval();
                                    return dequeueStatistic;
                                });
                            }
                            if (anonymousClass32.counter.decrementAndGet() == 0) {
                                RedisQues.this.removeOldQueues(anonymousClass32.limit).onComplete(asyncResult2 -> {
                                    if (asyncResult2.failed() && RedisQues.log.isWarnEnabled()) {
                                        RedisQues.log.warn("TODO error handling", RedisQues.this.exceptionFactory.newException("removeOldQueues(" + anonymousClass32.limit + ") failed", asyncResult2.cause()));
                                    }
                                    RedisQues.this.queueStatisticsCollector.resetQueueFailureStatistics(response, biConsumer);
                                });
                            } else {
                                RedisQues.this.queueStatisticsCollector.resetQueueFailureStatistics(response, biConsumer);
                            }
                        });
                    }
                    return anonymousClass3.iter.hasNext();
                }

                @Override // org.swisspush.redisques.performance.UpperBoundParallel.Mentor
                public boolean onError(Throwable th, Void r7) {
                    RedisQues.log.warn("TODO error handling", RedisQues.this.exceptionFactory.newException(th));
                    return true;
                }

                @Override // org.swisspush.redisques.performance.UpperBoundParallel.Mentor
                public void onDone(Void r4) {
                    anonymousClass3.redisAPI = null;
                    anonymousClass3.counter = null;
                    anonymousClass3.iter = null;
                    promise.complete();
                }

                @Override // org.swisspush.redisques.performance.UpperBoundParallel.Mentor
                public /* bridge */ /* synthetic */ boolean runOneMore(BiConsumer biConsumer, Void r6) {
                    return runOneMore2((BiConsumer<Throwable, Void>) biConsumer, r6);
                }
            });
            return promise.future();
        });
    }

    private Future<Void> removeOldQueues(long j) {
        Promise promise = Promise.promise();
        log.debug("Cleaning old queues");
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.zremrangebyscore(this.queuesKey, "-inf", String.valueOf(j), asyncResult -> {
                if (asyncResult.failed() && log.isWarnEnabled()) {
                    log.warn("TODO error handling", this.exceptionFactory.newException("redisAPI.zremrangebyscore('" + this.queuesKey + "', '-inf', " + j + ") failed", asyncResult.cause()));
                }
                promise.complete();
            });
        }).onFailure(th -> {
            log.warn("Redis: Failed to removeOldQueues", th);
            promise.complete();
        });
        return promise.future();
    }

    private QueueConfiguration findQueueConfiguration(String str) {
        for (QueueConfiguration queueConfiguration : this.configurationProvider.configuration().getQueueConfigurations()) {
            if (queueConfiguration.compiledPattern().matcher(str).matches()) {
                return queueConfiguration;
            }
        }
        return null;
    }

    static {
        $assertionsDisabled = !RedisQues.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(RedisQues.class);
    }
}
