package io.hoplin.rpc;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.hoplin.HoplinRuntimeException;
import io.hoplin.MessagePayload;
import io.hoplin.json.JsonCodec;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hoplin/rpc/RpcCallerConsumer.class */
public class RpcCallerConsumer extends DefaultConsumer {
    private static final Logger log = LoggerFactory.getLogger(RpcCallerConsumer.class);
    private ConcurrentHashMap<String, CompletableFuture> bindings;
    private JsonCodec codec;
    private final Executor executor;
    private boolean strictAction;

    public RpcCallerConsumer(Channel channel, Executor executor) {
        super(channel);
        this.bindings = new ConcurrentHashMap<>();
        this.strictAction = true;
        this.codec = new JsonCodec();
        this.executor = (Executor) Objects.requireNonNull(executor);
    }

    public RpcCallerConsumer(Channel channel) {
        this(channel, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
        log.info("RPC handleDelivery Envelope  : {}", envelope);
        log.info("RPC handleDelivery Properties: {}", basicProperties);
        String correlationId = basicProperties.getCorrelationId();
        CompletableFuture remove = this.bindings.remove(correlationId);
        if (this.strictAction && remove == null) {
            throw new HoplinRuntimeException("Reply received without corresponding action : " + correlationId);
        }
        if (this.strictAction || remove != null) {
            handleReply(bArr, remove);
        }
    }

    private void handleReply(byte[] bArr, CompletableFuture<Object> completableFuture) {
        CompletableFuture.runAsync(() -> {
            if (log.isDebugEnabled()) {
                log.debug("reply body : {}", new String(bArr));
            }
            try {
                MessagePayload<?> deserializeReplyPayload = deserializeReplyPayload(bArr);
                if (deserializeReplyPayload.isFailure()) {
                    completableFuture.complete(null);
                }
                completableFuture.complete(deserializeReplyPayload.getPayload());
            } catch (Exception e) {
                log.error("Unable to complete reply action", e);
                completableFuture.completeExceptionally(e);
            }
        }, this.executor);
    }

    public void bind(String str, CompletableFuture<?> completableFuture) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(completableFuture);
        this.bindings.put(str, completableFuture);
    }

    private MessagePayload<?> deserializeReplyPayload(byte[] bArr) {
        return (MessagePayload) this.codec.deserialize(bArr, MessagePayload.class);
    }
}
