package io.jexxa.infrastructure.messaging.jms;

import io.jexxa.TestConstants;
import io.jexxa.core.JexxaMain;
import io.jexxa.drivingadapter.messaging.JMSAdapter;
import io.jexxa.drivingadapter.messaging.listener.QueueListener;
import io.jexxa.drivingadapter.messaging.listener.TopicListener;
import io.jexxa.infrastructure.MessageSenderManager;
import io.jexxa.infrastructure.messaging.MessageSender;
import io.jexxa.infrastructure.outbox.TransactionalOutboxSender;
import io.jexxa.testapplication.JexxaTestApplication;
import io.jexxa.testapplication.domain.model.JexxaValueObject;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@Execution(ExecutionMode.SAME_THREAD)
@Tag(TestConstants.INTEGRATION_TEST)
/* loaded from: input_file:io/jexxa/infrastructure/messaging/jms/JMSSenderIT.class */
class JMSSenderIT {
    private static final String MESSAGE_SENDER_CONFIG = "getMessageSenderConfig";
    private static final String TYPE = "type";
    private final JexxaValueObject message = new JexxaValueObject(42);
    private TopicListener topicListener;
    private QueueListener queueListener;
    private JexxaMain jexxaMain;

    JMSSenderIT() {
    }

    @BeforeEach
    void initTests() {
        this.jexxaMain = new JexxaMain(JexxaTestApplication.class);
        this.topicListener = new TopicListener();
        this.queueListener = new QueueListener();
        this.jexxaMain.disableBanner().bind(JMSAdapter.class).to(this.queueListener).bind(JMSAdapter.class).to(this.topicListener).start();
    }

    static Stream<Class<? extends MessageSender>> getMessageSenderConfig() {
        return Stream.of((Object[]) new Class[]{JMSSender.class, TransactionalOutboxSender.class});
    }

    @MethodSource({MESSAGE_SENDER_CONFIG})
    @ParameterizedTest
    void sendMessageToTopic(Class<? extends MessageSender> cls) {
        MessageSenderManager.setDefaultStrategy(cls);
        MessageSenderManager.getMessageSender(JMSSenderIT.class, this.jexxaMain.getProperties()).send(this.message).toTopic("JEXXA_TOPIC").addHeader(TYPE, this.message.getClass().getSimpleName()).asJson();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!this.topicListener.getMessages().isEmpty());
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.topicListener.getMessages().get(0);
        });
        Duration ofSeconds = Duration.ofSeconds(5L);
        JexxaMain jexxaMain = this.jexxaMain;
        Objects.requireNonNull(jexxaMain);
        Assertions.assertTimeout(ofSeconds, jexxaMain::stop);
    }

    @MethodSource({MESSAGE_SENDER_CONFIG})
    @ParameterizedTest
    void sendMessageToQueue(Class<? extends MessageSender> cls) {
        MessageSenderManager.setDefaultStrategy(cls);
        MessageSenderManager.getMessageSender(JMSSenderIT.class, this.jexxaMain.getProperties()).send(this.message).toQueue(QueueListener.QUEUE_DESTINATION).addHeader(TYPE, this.message.getClass().getSimpleName()).asJson();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!this.queueListener.getMessages().isEmpty());
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.queueListener.getMessages().get(0);
        });
        Duration ofSeconds = Duration.ofSeconds(5L);
        JexxaMain jexxaMain = this.jexxaMain;
        Objects.requireNonNull(jexxaMain);
        Assertions.assertTimeout(ofSeconds, jexxaMain::stop);
    }

    @MethodSource({MESSAGE_SENDER_CONFIG})
    @ParameterizedTest
    void sendMessageToQueueAsString(Class<? extends MessageSender> cls) {
        MessageSenderManager.setDefaultStrategy(cls);
        MessageSenderManager.getMessageSender(JMSSenderIT.class, this.jexxaMain.getProperties()).send(this.message).toQueue(QueueListener.QUEUE_DESTINATION).addHeader(TYPE, this.message.getClass().getSimpleName()).asString();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!this.queueListener.getMessages().isEmpty());
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.queueListener.getMessages().get(0);
        });
        Duration ofSeconds = Duration.ofSeconds(5L);
        JexxaMain jexxaMain = this.jexxaMain;
        Objects.requireNonNull(jexxaMain);
        Assertions.assertTimeout(ofSeconds, jexxaMain::stop);
    }

    @MethodSource({MESSAGE_SENDER_CONFIG})
    @ParameterizedTest
    void sendByteMessageToTopic(Class<? extends MessageSender> cls) {
        MessageSenderManager.setDefaultStrategy(cls);
        MessageSenderManager.getMessageSender(JMSSenderIT.class, this.jexxaMain.getProperties()).sendByteMessage(this.message).toTopic("JEXXA_TOPIC").addHeader(TYPE, this.message.getClass().getSimpleName()).asJson();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!this.topicListener.getMessages().isEmpty());
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.topicListener.getMessages().get(0);
        });
        Duration ofSeconds = Duration.ofSeconds(5L);
        JexxaMain jexxaMain = this.jexxaMain;
        Objects.requireNonNull(jexxaMain);
        Assertions.assertTimeout(ofSeconds, jexxaMain::stop);
    }

    @MethodSource({MESSAGE_SENDER_CONFIG})
    @ParameterizedTest
    void sendByteMessageToQueue(Class<? extends MessageSender> cls) {
        MessageSenderManager.setDefaultStrategy(cls);
        MessageSenderManager.getMessageSender(JMSSenderIT.class, this.jexxaMain.getProperties()).sendByteMessage(this.message).toQueue(QueueListener.QUEUE_DESTINATION).addHeader(TYPE, this.message.getClass().getSimpleName()).asJson();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!this.queueListener.getMessages().isEmpty());
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.queueListener.getMessages().get(0);
        });
        Duration ofSeconds = Duration.ofSeconds(5L);
        JexxaMain jexxaMain = this.jexxaMain;
        Objects.requireNonNull(jexxaMain);
        Assertions.assertTimeout(ofSeconds, jexxaMain::stop);
    }

    @Test
    void sendMessageReconnectQueue() throws JMSException {
        MessageSenderManager.setDefaultStrategy(JMSSender.class);
        JMSSender messageSender = MessageSenderManager.getMessageSender(JMSSenderIT.class, this.jexxaMain.getProperties());
        messageSender.send(this.message).toQueue(QueueListener.QUEUE_DESTINATION).asJson();
        simulateConnectionException(messageSender.getConnection());
        messageSender.send(this.message).toQueue(QueueListener.QUEUE_DESTINATION).asJson();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.queueListener.getMessages().size() >= 2);
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.queueListener.getMessages().get(0);
        });
        Duration ofSeconds = Duration.ofSeconds(5L);
        JexxaMain jexxaMain = this.jexxaMain;
        Objects.requireNonNull(jexxaMain);
        Assertions.assertTimeout(ofSeconds, jexxaMain::stop);
    }

    @Test
    void testPasswordFile() {
        Properties properties = new Properties();
        properties.putAll(this.jexxaMain.getProperties());
        properties.remove("java.naming.password");
        properties.put("java.naming.file.password", "src/test/resources/secrets/jndiPassword");
        MessageSenderManager.setDefaultStrategy(JMSSender.class);
        MessageSenderManager.getMessageSender(JMSSenderIT.class, properties).sendByteMessage(this.message).toQueue(QueueListener.QUEUE_DESTINATION).addHeader(TYPE, this.message.getClass().getSimpleName()).asJson();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!this.queueListener.getMessages().isEmpty());
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.queueListener.getMessages().get(0);
        });
        Duration ofSeconds = Duration.ofSeconds(5L);
        JexxaMain jexxaMain = this.jexxaMain;
        Objects.requireNonNull(jexxaMain);
        Assertions.assertTimeout(ofSeconds, jexxaMain::stop);
    }

    private void simulateConnectionException(Connection connection) throws JMSException {
        ExceptionListener exceptionListener = connection.getExceptionListener();
        connection.close();
        exceptionListener.onException(new JMSException("Simulated error "));
    }
}
