package io.hoplin.batch;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import io.hoplin.Binding;
import io.hoplin.HoplinRuntimeException;
import io.hoplin.MessagePayload;
import io.hoplin.RabbitMQClient;
import io.hoplin.RabbitMQOptions;
import io.hoplin.json.JsonCodec;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hoplin/batch/DefaultBatchClient.class */
public class DefaultBatchClient implements BatchClient {
    private static final Logger log = LoggerFactory.getLogger(DefaultBatchClient.class);
    private final RabbitMQClient client;
    private JsonCodec codec;
    private final Channel channel;
    private String replyToQueueName;
    private final String exchange;
    private boolean directReply;
    private ConcurrentHashMap<UUID, CompletableFutureWrapperBatchContext> batches = new ConcurrentHashMap<>();
    private BatchReplyConsumer consumer;

    public DefaultBatchClient(RabbitMQOptions rabbitMQOptions, Binding binding) {
        Objects.requireNonNull(rabbitMQOptions);
        Objects.requireNonNull(binding);
        this.client = RabbitMQClient.create(rabbitMQOptions);
        this.channel = this.client.channel();
        this.codec = new JsonCodec();
        this.exchange = binding.getExchange();
        this.replyToQueueName = binding.getQueue();
        bind();
        consumeReply();
    }

    private void bind() {
        if (this.replyToQueueName == null || this.replyToQueueName.isEmpty() || "amq.rabbitmq.reply-to".equalsIgnoreCase(this.replyToQueueName)) {
            this.replyToQueueName = "amq.rabbitmq.reply-to";
            this.directReply = true;
        } else {
            this.replyToQueueName += ".reply-to." + UUID.randomUUID();
        }
        log.info("Param Exchange, ReplyTo, directReply  : {}", new Object[]{this.exchange, this.replyToQueueName, Boolean.valueOf(this.directReply)});
        try {
            if (!this.directReply) {
                this.channel.exchangeDeclare(this.exchange, "direct", true, false, (Map) null);
                this.channel.queueDeclare(this.replyToQueueName, false, false, true, (Map) null);
            }
        } catch (Exception e) {
            throw new HoplinRuntimeException("Unable to bind queue", e);
        }
    }

    @Override // io.hoplin.batch.BatchClient
    public CompletableFuture<BatchContext> startNew(Consumer<BatchContext> consumer) {
        Objects.requireNonNull(consumer);
        CompletableFuture<BatchContext> completableFuture = new CompletableFuture<>();
        BatchContext batchContext = new BatchContext();
        UUID batchId = batchContext.getBatchId();
        this.batches.put(batchId, new CompletableFutureWrapperBatchContext(completableFuture, batchContext));
        consumer.accept(batchContext);
        List<BatchContextTask> submittedTasks = batchContext.getSubmittedTasks();
        int size = submittedTasks.size();
        AtomicLong atomicLong = new AtomicLong();
        for (BatchContextTask batchContextTask : submittedTasks) {
            basicPublish(batchId, batchContextTask, "");
            UUID taskId = batchContextTask.getTaskId();
            atomicLong.incrementAndGet();
            log.info("Added task [{} of {}]: {} : {}", new Object[]{atomicLong, Integer.valueOf(size), taskId, batchContextTask});
        }
        return completableFuture;
    }

    private void basicPublish(UUID uuid, BatchContextTask batchContextTask, String str) {
        if (str == null) {
            throw new IllegalArgumentException("routingKey should not be null");
        }
        try {
            if (log.isDebugEnabled()) {
                log.debug("Publishing to Exchange = {}, RoutingKey = {} , ReplyTo = {}", new Object[]{this.exchange, str, this.replyToQueueName});
            }
            UUID taskId = batchContextTask.getTaskId();
            HashMap hashMap = new HashMap();
            hashMap.put("x-batch-id", uuid.toString());
            this.channel.basicPublish(this.exchange, str, new AMQP.BasicProperties.Builder().correlationId(taskId.toString()).replyTo(this.replyToQueueName).headers(hashMap).build(), createRequestPayload(batchContextTask.getMessage()));
        } catch (IOException e) {
            log.error("Unable to send request", e);
        }
    }

    @Override // io.hoplin.batch.BatchClient
    public UUID continueWith(UUID uuid, Consumer<BatchContext> consumer) {
        return null;
    }

    @Override // io.hoplin.batch.BatchClient
    public void cancel(UUID uuid) {
    }

    private <I> byte[] createRequestPayload(I i) {
        return this.codec.serialize(new MessagePayload(i));
    }

    private void consumeReply() {
        try {
            this.consumer = new BatchReplyConsumer(this.channel, this.batches);
            this.channel.basicConsume(this.replyToQueueName, true, this.consumer);
        } catch (Exception e) {
            throw new HoplinRuntimeException("Unable to start RPC client reply consumer", e);
        }
    }
}
