package io.hoplin.rpc;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import io.hoplin.Binding;
import io.hoplin.HoplinRuntimeException;
import io.hoplin.MessagePayload;
import io.hoplin.RabbitMQClient;
import io.hoplin.RabbitMQOptions;
import io.hoplin.json.JsonCodec;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hoplin/rpc/DefaultRpcClient.class */
public class DefaultRpcClient<I, O> implements RpcClient<I, O> {
    private static final Logger log = LoggerFactory.getLogger(DefaultRpcClient.class);
    private final RabbitMQClient client;
    private JsonCodec codec;
    private final Channel channel;
    private String replyToQueueName;
    private final String exchange;
    private RpcCallerConsumer consumer;
    private boolean directReply;

    public DefaultRpcClient(RabbitMQOptions rabbitMQOptions, Binding binding) {
        Objects.requireNonNull(rabbitMQOptions);
        Objects.requireNonNull(binding);
        this.client = RabbitMQClient.create(rabbitMQOptions);
        this.channel = this.client.channel();
        this.codec = new JsonCodec();
        this.exchange = binding.getExchange();
        this.replyToQueueName = binding.getQueue();
        bind();
        consumeReply();
    }

    private void bind() {
        if (this.replyToQueueName == null || this.replyToQueueName.isEmpty() || "amq.rabbitmq.reply-to".equalsIgnoreCase(this.replyToQueueName)) {
            this.replyToQueueName = "amq.rabbitmq.reply-to";
            this.directReply = true;
        } else {
            this.replyToQueueName += ".reply-to." + UUID.randomUUID();
        }
        log.info("Param Exchange    : {}", this.exchange);
        log.info("Param ReplyTo     : {}", this.replyToQueueName);
        log.info("Param directReply : {}", Boolean.valueOf(this.directReply));
        try {
            if (!this.directReply) {
                this.channel.exchangeDeclare(this.exchange, "direct", false, true, (Map) null);
                this.channel.queueDeclare(this.replyToQueueName, false, false, true, (Map) null);
            }
        } catch (Exception e) {
            throw new HoplinRuntimeException("Unable to bind queue", e);
        }
    }

    private void consumeReply() {
        try {
            this.consumer = new RpcCallerConsumer(this.channel);
            this.channel.basicConsume(this.replyToQueueName, true, this.consumer);
        } catch (Exception e) {
            throw new HoplinRuntimeException("Unable to start RPC client reply consumer", e);
        }
    }

    @Override // io.hoplin.rpc.RpcClient
    public O request(I i) {
        return request(i, "", Duration.ZERO);
    }

    @Override // io.hoplin.rpc.RpcClient
    public O request(I i, String str) {
        return request(i, str, Duration.ZERO);
    }

    @Override // io.hoplin.rpc.RpcClient
    public O request(I i, Duration duration) {
        return request(i, "", duration);
    }

    @Override // io.hoplin.rpc.RpcClient
    public O request(I i, String str, Duration duration) {
        try {
            return requestAsync(i, str, duration).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } catch (ExecutionException e2) {
            log.error("Execution error", e2);
            return null;
        }
    }

    @Override // io.hoplin.rpc.RpcClient
    public CompletableFuture<O> requestAsync(I i) {
        return requestAsync((DefaultRpcClient<I, O>) i, "");
    }

    @Override // io.hoplin.rpc.RpcClient
    public CompletableFuture<O> requestAsync(I i, String str) {
        return requestAsync(i, str, Duration.ZERO);
    }

    @Override // io.hoplin.rpc.RpcClient
    public CompletableFuture<O> requestAsync(I i, Duration duration) {
        return requestAsync(i, "", duration);
    }

    @Override // io.hoplin.rpc.RpcClient
    public CompletableFuture<O> requestAsync(I i, String str, Duration duration) {
        if (str == null) {
            throw new IllegalArgumentException("routingKey should not be null");
        }
        CompletableFuture<O> completableFuture = new CompletableFuture<>();
        try {
            log.info("Publishing to Exchange = {}, RoutingKey = {} , ReplyTo = {}", new Object[]{this.exchange, str, this.replyToQueueName});
            String uuid = UUID.randomUUID().toString();
            AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().correlationId(uuid).replyTo(this.replyToQueueName).build();
            this.consumer.bind(uuid, completableFuture);
            this.channel.basicPublish(this.exchange, str, build, createRequestPayload(i));
        } catch (IOException e) {
            completableFuture.completeExceptionally(e);
            log.error("Unable to send request", e);
        }
        return completableFuture;
    }

    public static RpcClient create(RabbitMQOptions rabbitMQOptions, Binding binding) {
        Objects.requireNonNull(rabbitMQOptions);
        Objects.requireNonNull(binding);
        return new DefaultRpcClient(rabbitMQOptions, binding);
    }

    private byte[] createRequestPayload(I i) {
        MessagePayload messagePayload = new MessagePayload(i);
        messagePayload.setType(i.getClass());
        return this.codec.serialize(messagePayload);
    }

    public void close() throws IOException {
        if (this.client != null) {
            this.client.disconnect();
        }
    }
}
