package org.duracloud.common.changenotifier;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.duracloud.common.error.DuraCloudRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/duracloud/common/changenotifier/RabbitMQSubscriptionManager.class */
public class RabbitMQSubscriptionManager implements SubscriptionManager {
    private Channel mqChannel;
    private String mqHost;
    private Integer mqPort;
    private String mqVhost;
    private String queueName;
    private String queueUrl;
    private String mqUsername;
    private String mqPassword;
    private String exchangeName;
    private String consumerName;
    private Logger log = LoggerFactory.getLogger(RabbitMQSubscriptionManager.class);
    private boolean initialized = false;
    private List<MessageListener> messageListeners = new ArrayList();

    public RabbitMQSubscriptionManager(String str, Integer num, String str2, String str3, String str4, String str5, String str6) {
        this.mqHost = str;
        this.mqPort = num;
        this.mqVhost = str2;
        this.exchangeName = str3;
        this.mqUsername = str4;
        this.mqPassword = str5;
        this.queueName = str6;
        this.consumerName = "consumer-" + str6;
    }

    @Override // org.duracloud.common.changenotifier.SubscriptionManager
    public void addListener(MessageListener messageListener) {
        this.messageListeners.add(messageListener);
    }

    @Override // org.duracloud.common.changenotifier.SubscriptionManager
    public synchronized void connect() {
        if (this.initialized) {
            throw new DuraCloudRuntimeException("this manager is already connected");
        }
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUsername(this.mqUsername);
            connectionFactory.setPassword(this.mqPassword);
            connectionFactory.setVirtualHost(this.mqVhost);
            connectionFactory.setHost(this.mqHost);
            connectionFactory.setPort(this.mqPort.intValue());
            Connection newConnection = connectionFactory.newConnection();
            this.mqChannel = newConnection.createChannel();
            this.queueUrl = "(RabbitMQ) " + newConnection.getAddress();
            this.mqChannel.queueDeclare(this.queueName, true, false, false, (Map) null);
            this.mqChannel.queueBind(this.queueName, this.exchangeName, this.queueName);
            this.log.info("Subscribing consumer {} to queue {} on vhost {} at URL {}", new Object[]{this.consumerName, this.queueName, this.mqVhost, this.queueUrl});
            startConsumer();
        } catch (Exception e) {
            this.initialized = false;
            this.log.error("failed to estabilish connection to RabbitMQ with queue name {} and URL {} because {}", new Object[]{this.queueName, this.queueUrl, e.getMessage()});
            throw new DuraCloudRuntimeException(e);
        }
    }

    private void startConsumer() {
        try {
            this.mqChannel.basicConsume(this.queueName, false, this.consumerName, new DefaultConsumer(this.mqChannel) { // from class: org.duracloud.common.changenotifier.RabbitMQSubscriptionManager.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String str2 = new String(bArr);
                    RabbitMQSubscriptionManager.this.dispatch(str2);
                    RabbitMQSubscriptionManager.this.log.debug("{} dispatched", str2);
                    RabbitMQSubscriptionManager.this.mqChannel.basicAck(deliveryTag, false);
                    RabbitMQSubscriptionManager.this.log.debug("{} deleted", str2);
                }

                public void handleConsumeOk(String str) {
                    RabbitMQSubscriptionManager.this.log.info("Consumer registered: {}", str);
                    RabbitMQSubscriptionManager.this.initialized = true;
                }

                public void handleCancel(String str) {
                    RabbitMQSubscriptionManager.this.log.warn("Consumer has been cancelled unexpectedly: " + str);
                }

                public void handleCancelOk(String str) {
                    RabbitMQSubscriptionManager.this.log.info("Consumer has been cancelled successfully: " + str);
                }

                public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
                    RabbitMQSubscriptionManager.this.log.warn("Either the channel or the underlying connection has been shut down for consumer {} because {}", str, shutdownSignalException.getReason().toString());
                    RabbitMQSubscriptionManager.this.initialized = false;
                }
            });
        } catch (Exception e) {
            this.log.error("Consumer failed to subscribe: " + e.getMessage(), e);
            this.initialized = false;
        }
    }

    private void dispatch(String str) {
        this.log.debug("Dispatching message {}", str);
        for (MessageListener messageListener : this.messageListeners) {
            try {
                messageListener.onMessage(str);
            } catch (Exception e) {
                this.log.error("Failed to dispatch message " + str + " to " + messageListener + "due to " + e.getMessage(), e);
            }
        }
    }

    private void cancelConsumer() {
        try {
            this.mqChannel.basicCancel(this.consumerName);
            this.log.info("Unsubscripbed consumer {}", this.consumerName);
        } catch (IOException e) {
            this.log.error("Error unsubscribing consumer {}", this.consumerName, e);
        }
    }

    private void deleteQueue() {
        try {
            this.mqChannel.queueDelete(this.queueName);
            this.log.info("Deleted queue {}", this.queueName);
        } catch (IOException e) {
            this.log.error("Error deleting queue {}", this.queueName, e);
        }
    }

    @Override // org.duracloud.common.changenotifier.SubscriptionManager
    public void disconnect() {
        if (!this.initialized) {
            throw new DuraCloudRuntimeException("This manager is already disconnected");
        }
        this.log.info("Disconnecting");
        this.log.info("Unsubscribing {}", this.consumerName);
        cancelConsumer();
        this.log.info("Deleting queue {}", this.queueName);
        deleteQueue();
        this.initialized = false;
        this.log.info("Disconnection complete");
    }
}
