package io.hoplin.rpc;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.hoplin.AckStrategy;
import io.hoplin.AcknowledgmentStrategies;
import io.hoplin.ConsumerErrorStrategy;
import io.hoplin.DeadLetterErrorStrategy;
import io.hoplin.HoplinRuntimeException;
import io.hoplin.MessageContext;
import io.hoplin.MessagePayload;
import io.hoplin.json.JsonCodec;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hoplin/rpc/RpcResponderConsumer.class */
public class RpcResponderConsumer<I, O> extends DefaultConsumer {
    private static final Logger log = LoggerFactory.getLogger(RpcResponderConsumer.class);
    private final Executor executor;
    private final Function<I, O> handler;
    private JsonCodec codec;
    private ConsumerErrorStrategy errorStrategy;

    public RpcResponderConsumer(Channel channel, Function<I, O> function, Executor executor) {
        super(channel);
        this.executor = (Executor) Objects.requireNonNull(executor);
        this.handler = (Function) Objects.requireNonNull(function);
        this.codec = new JsonCodec();
        this.errorStrategy = new DeadLetterErrorStrategy(channel);
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
        log.info("RPC handleDelivery Envelope   : {}", envelope);
        log.info("RPC handleDelivery Properties : {}", basicProperties);
        CompletableFuture.supplyAsync(() -> {
            return dispatch(bArr);
        }, this.executor).whenComplete((bArr2, th) -> {
            MessageContext create = MessageContext.create(str, envelope, basicProperties);
            byte[] bArr2 = bArr2;
            if (th != null) {
                try {
                    log.warn("Error dispatching message : {}", th);
                    bArr2 = createErrorMessage(th);
                } catch (Exception e) {
                    log.error("Unable to acknowledge execution", e);
                    return;
                }
            }
            AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().correlationId(basicProperties.getCorrelationId()).build();
            String replyTo = basicProperties.getReplyTo();
            log.info("replyTo, correlationId :  {}, {}", replyTo, basicProperties.getCorrelationId());
            getChannel().basicPublish("", replyTo, build, bArr2);
            AckStrategy.acknowledge(getChannel(), create, AcknowledgmentStrategies.BASIC_ACK.strategy());
        });
    }

    private byte[] dispatch(byte[] bArr) {
        try {
            try {
                return this.codec.serialize(new MessagePayload(this.handler.apply(((MessagePayload) this.codec.deserialize(bArr, MessagePayload.class)).getPayload())), MessagePayload.class);
            } catch (Exception e) {
                log.warn("Handling message error : {} ", e);
                return this.codec.serialize(MessagePayload.error(e), MessagePayload.class);
            }
        } catch (Exception e2) {
            log.error("Unable to apply reply handler", e2);
            throw new HoplinRuntimeException("Unable to apply reply handler", e2);
        }
    }

    private byte[] createErrorMessage(Throwable th) {
        try {
            return this.codec.serialize(MessagePayload.error(th), MessagePayload.class);
        } catch (Exception e) {
            log.error("Unable to serialize message", e);
            throw new HoplinRuntimeException("Unable to serialize message", e);
        }
    }
}
