package es.iti.wakamiti.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import es.iti.commons.jext.Extension;
import es.iti.wakamiti.api.WakamitiAPI;
import es.iti.wakamiti.api.WakamitiException;
import es.iti.wakamiti.api.annotations.I18nResource;
import es.iti.wakamiti.api.annotations.Step;
import es.iti.wakamiti.api.annotations.TearDown;
import es.iti.wakamiti.api.extensions.StepContributor;
import es.iti.wakamiti.api.plan.Document;
import es.iti.wakamiti.api.util.WakamitiLogger;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.Logger;

@I18nResource("iti_wakamiti_wakamiti-amqp")
@Extension(provider = "es.iti.wakamiti", name = "amqp-steps", version = "1.1")
/* loaded from: input_file:es/iti/wakamiti/amqp/AmqpStepContributor.class */
public class AmqpStepContributor implements StepContributor {
    private AmqpConnectionParams connectionParams;
    private Connection connection;
    private Channel channel;
    private String destination;
    private boolean durable;
    private boolean exclusive;
    private boolean autoDelete;
    private final Logger logger = WakamitiLogger.forClass(AmqpStepContributor.class);
    private final Map<String, List<String>> receivedMessages = new HashMap();

    public void setConnectionParams(AmqpConnectionParams amqpConnectionParams) {
        this.connectionParams = amqpConnectionParams;
    }

    protected Channel channel() {
        if (this.channel == null) {
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setUri(this.connectionParams.host());
                connectionFactory.setUsername(this.connectionParams.username());
                connectionFactory.setPassword(this.connectionParams.password());
                this.connection = connectionFactory.newConnection();
                this.channel = this.connection.createChannel();
            } catch (IOException | URISyntaxException | GeneralSecurityException | TimeoutException e) {
                throw new WakamitiException("Error connecting to AMQP server: {}", new Object[]{e.getMessage(), e});
            }
        }
        return this.channel;
    }

    public void setDurable(boolean z) {
        this.durable = z;
    }

    public void setExclusive(boolean z) {
        this.exclusive = z;
    }

    public void setAutoDelete(boolean z) {
        this.autoDelete = z;
    }

    @TearDown
    public void releaseConnection() {
        try {
            if (this.channel != null) {
                this.channel.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        } catch (IOException | TimeoutException e) {
            this.logger.warn("There were problems releasing the connection: {}", e.getMessage());
            this.logger.debug(e.toString(), e);
        }
    }

    void sendJsonMessageToQueue(String str, String str2) {
        sendTextMessageToQueue(str, str2, "application/json");
    }

    void sendTextMessageToQueue(String str, String str2, String str3) {
        try {
            declareQueue(str);
            byte[] bytes = str2.getBytes(StandardCharsets.UTF_8);
            channel().basicPublish("", str, new AMQP.BasicProperties(str3, (String) null, (Map) null, (Integer) null, (Integer) null, (String) null, (String) null, (String) null, (String) null, (Date) null, (String) null, (String) null, (String) null, (String) null), bytes);
        } catch (IOException e) {
            throw new WakamitiException(e);
        }
    }

    void consumeQueue(final String str) {
        try {
            declareQueue(str);
            channel().basicConsume(str, false, new DefaultConsumer(channel()) { // from class: es.iti.wakamiti.amqp.AmqpStepContributor.1
                public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    AmqpStepContributor.this.receivedMessages.computeIfAbsent(str, str3 -> {
                        return new ArrayList();
                    }).add(new String(bArr, StandardCharsets.UTF_8));
                    AmqpStepContributor.this.channel().basicAck(envelope.getDeliveryTag(), true);
                }
            });
        } catch (IOException e) {
            throw new WakamitiException(e);
        }
    }

    private boolean messageExistsInReceived(String str) {
        Objects.requireNonNull(this.destination, "Destination queue is not defined");
        return this.receivedMessages.computeIfAbsent(this.destination, str2 -> {
            return new ArrayList();
        }).stream().anyMatch(str3 -> {
            return str3.equals(str);
        });
    }

    private void checkMessageExistsInReceived(String str, Long l) {
        try {
            Awaitility.await().atMost(l.longValue(), TimeUnit.SECONDS).pollInterval(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> {
                return Boolean.valueOf(messageExistsInReceived(str));
            });
        } catch (ConditionTimeoutException e) {
            throw new AssertionError("Message not received in " + l + " seconds");
        }
    }

    private void declareQueue(String str) throws IOException {
        channel().queueDeclare(str, this.durable, this.exclusive, this.autoDelete, Map.of());
    }

    private String readFile(File file) {
        return WakamitiAPI.instance().resourceLoader().readFileAsString(file);
    }

    @Step(value = "amqp.define.connection.parameters", args = {"url:text", "username:text", "password:text"})
    public void defineConnectionParameters(String str, String str2, String str3) {
        this.connectionParams = new AmqpConnectionParams(str, str2, str3);
    }

    @Step(value = "amqp.define.destination.queue", args = {"word"})
    public void defineDestinationQueue(String str) {
        this.destination = str;
        consumeQueue(str);
    }

    @Step(value = "amqp.send.json.from.string", args = {"word"})
    public void sendJSONFromString(String str, Document document) {
        sendJsonMessageToQueue(str, document.getContent());
    }

    @Step(value = "amqp.send.json.from.file", args = {"queue:word", "file:file"})
    public void sendJSONFromFile(String str, File file) {
        sendJsonMessageToQueue(str, readFile(file));
    }

    @Step("amqp.send.await")
    public void awaitFor(Long l) {
        Awaitility.await().timeout(l.longValue() + 1, TimeUnit.SECONDS).pollDelay(l.longValue(), TimeUnit.SECONDS).until(() -> {
            return true;
        });
    }

    @Step("amqp.check.received.json.from.string")
    public void checkReceivedJSONFromString(Long l, Document document) {
        checkMessageExistsInReceived(document.getContent(), l);
    }

    @Step(value = "amqp.check.received.json.from.file", args = {"seconds:integer", "file:file"})
    public void checkReceivedJSONFromString(Long l, File file) {
        checkMessageExistsInReceived(readFile(file), l);
    }
}
