package uy.kohesive.vertx.sqs;

import com.amazonaws.auth.AWSCredentialsProvider;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.IntIterator;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.MutablePropertyReference1Impl;
import kotlin.jvm.internal.Reflection;
import kotlin.properties.Delegates;
import kotlin.properties.ReadWriteProperty;
import kotlin.ranges.IntRange;
import kotlin.reflect.KProperty;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import uy.kohesive.vertx.sqs.impl.SqsClientImpl;

/* compiled from: SqsSequentialQueueConsumerVerticle.kt */
@Metadata(mv = {1, 1, 9}, bv = {1, 0, 2}, k = 1, d1 = {"��V\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u000b\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u000b\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0010\b\n��\n\u0002\u0010\t\n\u0002\b\u0003\u0018��2\u00020\u00012\u00020\u0002B\u000f\b\u0016\u0012\u0006\u0010\u0003\u001a\u00020\u0004¢\u0006\u0002\u0010\u0005B\u0005¢\u0006\u0002\u0010\u0006J\u0016\u0010#\u001a\u00020$2\f\u0010%\u001a\b\u0012\u0004\u0012\u00020'0&H\u0016J\u0016\u0010(\u001a\u00020$2\f\u0010)\u001a\b\u0012\u0004\u0012\u00020'0&H\u0016J8\u0010*\u001a\u00020$2\u0006\u0010+\u001a\u00020,2\u0006\u0010-\u001a\u00020,2\u0006\u0010.\u001a\u00020/2\u0006\u00100\u001a\u0002012\u0006\u00102\u001a\u00020/2\u0006\u00103\u001a\u000201H\u0002R+\u0010\t\u001a\u00020\b2\u0006\u0010\u0007\u001a\u00020\b8V@VX\u0096\u008e\u0002¢\u0006\u0012\n\u0004\b\u000e\u0010\u000f\u001a\u0004\b\n\u0010\u000b\"\u0004\b\f\u0010\rR\u001c\u0010\u0003\u001a\u0004\u0018\u00010\u0004X\u0096\u000e¢\u0006\u000e\n��\u001a\u0004\b\u0010\u0010\u0011\"\u0004\b\u0012\u0010\u0005R\u001c\u0010\u0013\u001a\n \u0015*\u0004\u0018\u00010\u00140\u0014X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0016\u0010\u0017R+\u0010\u0019\u001a\u00020\u00182\u0006\u0010\u0007\u001a\u00020\u00188B@BX\u0082\u008e\u0002¢\u0006\u0012\n\u0004\b\u001e\u0010\u000f\u001a\u0004\b\u001a\u0010\u001b\"\u0004\b\u001c\u0010\u001dR+\u0010\u001f\u001a\u00020\u00182\u0006\u0010\u0007\u001a\u00020\u00188B@BX\u0082\u008e\u0002¢\u0006\u0012\n\u0004\b\"\u0010\u000f\u001a\u0004\b \u0010\u001b\"\u0004\b!\u0010\u001d¨\u00064"}, d2 = {"Luy/kohesive/vertx/sqs/SqsSequentialQueueConsumerVerticle;", "Lio/vertx/core/AbstractVerticle;", "Luy/kohesive/vertx/sqs/SqsVerticle;", "credentialsProvider", "Lcom/amazonaws/auth/AWSCredentialsProvider;", "(Lcom/amazonaws/auth/AWSCredentialsProvider;)V", "()V", "<set-?>", "Luy/kohesive/vertx/sqs/SqsClient;", "client", "getClient", "()Luy/kohesive/vertx/sqs/SqsClient;", "setClient", "(Luy/kohesive/vertx/sqs/SqsClient;)V", "client$delegate", "Lkotlin/properties/ReadWriteProperty;", "getCredentialsProvider", "()Lcom/amazonaws/auth/AWSCredentialsProvider;", "setCredentialsProvider", "log", "Lio/vertx/core/logging/Logger;", "kotlin.jvm.PlatformType", "getLog", "()Lio/vertx/core/logging/Logger;", "Ljava/util/concurrent/ExecutorService;", "pollingPool", "getPollingPool", "()Ljava/util/concurrent/ExecutorService;", "setPollingPool", "(Ljava/util/concurrent/ExecutorService;)V", "pollingPool$delegate", "routingPool", "getRoutingPool", "setRoutingPool", "routingPool$delegate", "start", "", "startFuture", "Lio/vertx/core/Future;", "Ljava/lang/Void;", "stop", "stopFuture", "subscribe", "queueUrl", "", "address", "workersCount", "", "timeout", "", "bufferSize", "pollingInterval", "vertx-sqs"})
/* loaded from: input_file:uy/kohesive/vertx/sqs/SqsSequentialQueueConsumerVerticle.class */
public final class SqsSequentialQueueConsumerVerticle extends AbstractVerticle implements SqsVerticle {
    static final /* synthetic */ KProperty[] $$delegatedProperties = {(KProperty) Reflection.mutableProperty1(new MutablePropertyReference1Impl(Reflection.getOrCreateKotlinClass(SqsSequentialQueueConsumerVerticle.class), "client", "getClient()Luy/kohesive/vertx/sqs/SqsClient;")), (KProperty) Reflection.mutableProperty1(new MutablePropertyReference1Impl(Reflection.getOrCreateKotlinClass(SqsSequentialQueueConsumerVerticle.class), "pollingPool", "getPollingPool()Ljava/util/concurrent/ExecutorService;")), (KProperty) Reflection.mutableProperty1(new MutablePropertyReference1Impl(Reflection.getOrCreateKotlinClass(SqsSequentialQueueConsumerVerticle.class), "routingPool", "getRoutingPool()Ljava/util/concurrent/ExecutorService;"))};

    @Nullable
    private AWSCredentialsProvider credentialsProvider;

    @NotNull
    private final ReadWriteProperty client$delegate;
    private final Logger log;
    private final ReadWriteProperty pollingPool$delegate;
    private final ReadWriteProperty routingPool$delegate;

    @Override // uy.kohesive.vertx.sqs.SqsVerticle
    @Nullable
    public AWSCredentialsProvider getCredentialsProvider() {
        return this.credentialsProvider;
    }

    @Override // uy.kohesive.vertx.sqs.SqsVerticle
    public void setCredentialsProvider(@Nullable AWSCredentialsProvider aWSCredentialsProvider) {
        this.credentialsProvider = aWSCredentialsProvider;
    }

    @Override // uy.kohesive.vertx.sqs.SqsVerticle
    @NotNull
    public SqsClient getClient() {
        return (SqsClient) this.client$delegate.getValue(this, $$delegatedProperties[0]);
    }

    public void setClient(@NotNull SqsClient sqsClient) {
        Intrinsics.checkParameterIsNotNull(sqsClient, "<set-?>");
        this.client$delegate.setValue(this, $$delegatedProperties[0], sqsClient);
    }

    @Override // uy.kohesive.vertx.sqs.SqsVerticle
    public Logger getLog() {
        return this.log;
    }

    private final ExecutorService getPollingPool() {
        return (ExecutorService) this.pollingPool$delegate.getValue(this, $$delegatedProperties[1]);
    }

    private final void setPollingPool(ExecutorService executorService) {
        this.pollingPool$delegate.setValue(this, $$delegatedProperties[1], executorService);
    }

    private final ExecutorService getRoutingPool() {
        return (ExecutorService) this.routingPool$delegate.getValue(this, $$delegatedProperties[2]);
    }

    private final void setRoutingPool(ExecutorService executorService) {
        this.routingPool$delegate.setValue(this, $$delegatedProperties[2], executorService);
    }

    public void start(@NotNull final Future<Void> future) {
        Intrinsics.checkParameterIsNotNull(future, "startFuture");
        Vertx vertx = this.vertx;
        Intrinsics.checkExpressionValueIsNotNull(vertx, "vertx");
        JsonObject config = config();
        Intrinsics.checkExpressionValueIsNotNull(config, "config()");
        setClient(new SqsClientImpl(vertx, config, getCredentialsProvider()));
        final String string = config().getString("queueUrl");
        final String string2 = config().getString("address");
        final Integer integer = config().getInteger("workersCount");
        Long l = config().getLong("timeout");
        final long longValue = l != null ? l.longValue() : SqsVerticle.Companion.getDefaultTimeout();
        Integer integer2 = config().getInteger("bufferSize");
        int intValue = integer2 != null ? integer2.intValue() : integer.intValue() * 10;
        final int i = intValue > 10 ? 10 : intValue;
        Long l2 = config().getLong("pollingInterval");
        final long longValue2 = l2 != null ? l2.longValue() : 1000L;
        Intrinsics.checkExpressionValueIsNotNull(integer, "workersCount");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(integer.intValue());
        Intrinsics.checkExpressionValueIsNotNull(newFixedThreadPool, "Executors.newFixedThreadPool(workersCount)");
        setRoutingPool(newFixedThreadPool);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Intrinsics.checkExpressionValueIsNotNull(newSingleThreadExecutor, "Executors.newSingleThreadExecutor()");
        setPollingPool(newSingleThreadExecutor);
        getClient().start(new Handler<AsyncResult<Void>>() { // from class: uy.kohesive.vertx.sqs.SqsSequentialQueueConsumerVerticle$start$1
            public final void handle(AsyncResult<Void> asyncResult) {
                if (!asyncResult.succeeded()) {
                    future.fail(asyncResult.cause());
                    return;
                }
                SqsSequentialQueueConsumerVerticle sqsSequentialQueueConsumerVerticle = SqsSequentialQueueConsumerVerticle.this;
                String str = string;
                Intrinsics.checkExpressionValueIsNotNull(str, "queueUrl");
                String str2 = string2;
                Intrinsics.checkExpressionValueIsNotNull(str2, "address");
                Integer num = integer;
                Intrinsics.checkExpressionValueIsNotNull(num, "workersCount");
                sqsSequentialQueueConsumerVerticle.subscribe(str, str2, num.intValue(), longValue, i, longValue2);
                future.complete();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void subscribe(final String str, final String str2, int i, final long j, final int i2, final long j2) {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        getPollingPool().execute(new Runnable() { // from class: uy.kohesive.vertx.sqs.SqsSequentialQueueConsumerVerticle$subscribe$1
            @Override // java.lang.Runnable
            public final void run() {
                while (true) {
                    if (linkedBlockingQueue.isEmpty()) {
                        final CountDownLatch countDownLatch = new CountDownLatch(1);
                        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                        SqsSequentialQueueConsumerVerticle.this.getClient().receiveMessages(str, Integer.valueOf(i2), new Handler<AsyncResult<List<JsonObject>>>() { // from class: uy.kohesive.vertx.sqs.SqsSequentialQueueConsumerVerticle$subscribe$1.1
                            public final void handle(AsyncResult<List<JsonObject>> asyncResult) {
                                try {
                                    if (!asyncResult.succeeded()) {
                                        SqsSequentialQueueConsumerVerticle.this.getLog().error("Can't poll messages from " + str, asyncResult.cause());
                                    } else if (((List) asyncResult.result()).isEmpty()) {
                                        atomicBoolean.set(true);
                                    } else {
                                        Object result = asyncResult.result();
                                        Intrinsics.checkExpressionValueIsNotNull(result, "it.result()");
                                        Iterable<JsonObject> iterable = (Iterable) result;
                                        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(iterable, 10));
                                        for (JsonObject jsonObject : iterable) {
                                            String string = jsonObject.getString("receiptHandle");
                                            Intrinsics.checkExpressionValueIsNotNull(string, "jsonMessage.getString(\"receiptHandle\")");
                                            Intrinsics.checkExpressionValueIsNotNull(jsonObject, "jsonMessage");
                                            arrayList.add(new SqsMessage(string, jsonObject));
                                        }
                                        Iterator it = arrayList.iterator();
                                        while (it.hasNext()) {
                                            linkedBlockingQueue.offer((SqsMessage) it.next());
                                        }
                                    }
                                } finally {
                                    countDownLatch.countDown();
                                }
                            }
                        });
                        countDownLatch.await();
                        if (atomicBoolean.get()) {
                            Thread.sleep(5000L);
                        } else {
                            Thread.sleep(j2);
                        }
                    } else {
                        Thread.sleep(j2);
                    }
                }
            }
        });
        Runnable runnable = (Function0) new Function0<Unit>() { // from class: uy.kohesive.vertx.sqs.SqsSequentialQueueConsumerVerticle$subscribe$routingTask$1
            public /* bridge */ /* synthetic */ Object invoke() {
                m3invoke();
                return Unit.INSTANCE;
            }

            /* renamed from: invoke, reason: collision with other method in class */
            public final void m3invoke() {
                Vertx vertx;
                while (true) {
                    final SqsMessage sqsMessage = (SqsMessage) linkedBlockingQueue.take();
                    final CountDownLatch countDownLatch = new CountDownLatch(1);
                    vertx = SqsSequentialQueueConsumerVerticle.this.vertx;
                    vertx.eventBus().send(str2, sqsMessage.getMessage(), new DeliveryOptions().setSendTimeout(j), new Handler<AsyncResult<Message<Void>>>() { // from class: uy.kohesive.vertx.sqs.SqsSequentialQueueConsumerVerticle$subscribe$routingTask$1.1
                        public final void handle(@NotNull AsyncResult<Message<Void>> asyncResult) {
                            Intrinsics.checkParameterIsNotNull(asyncResult, "ar");
                            if (asyncResult.succeeded()) {
                                SqsVerticleKt.deleteMessage(SqsSequentialQueueConsumerVerticle.this, str, sqsMessage.getReceipt());
                            } else {
                                SqsSequentialQueueConsumerVerticle.this.getLog().warn("Message with receipt " + sqsMessage.getReceipt() + " was failed to process by the consumer");
                            }
                            countDownLatch.countDown();
                        }
                    });
                    countDownLatch.await(100 + j, TimeUnit.MILLISECONDS);
                }
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }
        };
        IntIterator it = new IntRange(1, i).iterator();
        while (it.hasNext()) {
            it.nextInt();
            ExecutorService routingPool = getRoutingPool();
            final Runnable runnable2 = runnable;
            if (runnable2 != null) {
                runnable2 = new Runnable() { // from class: uy.kohesive.vertx.sqs.SqsSequentialQueueConsumerVerticleKt$sam$Runnable$c2f8b019
                    @Override // java.lang.Runnable
                    public final /* synthetic */ void run() {
                        Intrinsics.checkExpressionValueIsNotNull(runnable2.invoke(), "invoke(...)");
                    }
                };
            }
            routingPool.execute(runnable2);
        }
    }

    public void stop(@NotNull final Future<Void> future) {
        Intrinsics.checkParameterIsNotNull(future, "stopFuture");
        getRoutingPool().shutdown();
        getPollingPool().shutdown();
        getClient().stop(new Handler<AsyncResult<Void>>() { // from class: uy.kohesive.vertx.sqs.SqsSequentialQueueConsumerVerticle$stop$1
            public final void handle(AsyncResult<Void> asyncResult) {
                if (asyncResult.succeeded()) {
                    future.complete();
                } else {
                    future.fail(asyncResult.cause());
                }
            }
        });
    }

    public SqsSequentialQueueConsumerVerticle() {
        this.client$delegate = Delegates.INSTANCE.notNull();
        this.log = LoggerFactory.getLogger("SqsSequentialQueueConsumerVerticle");
        this.pollingPool$delegate = Delegates.INSTANCE.notNull();
        this.routingPool$delegate = Delegates.INSTANCE.notNull();
    }

    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    public SqsSequentialQueueConsumerVerticle(@NotNull AWSCredentialsProvider aWSCredentialsProvider) {
        this();
        Intrinsics.checkParameterIsNotNull(aWSCredentialsProvider, "credentialsProvider");
        setCredentialsProvider(aWSCredentialsProvider);
    }
}
