package tech.guyi.component.message.stream.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Resource;
import lombok.NonNull;
import tech.guyi.component.message.stream.api.stream.MessageStream;
import tech.guyi.component.message.stream.api.stream.entry.Message;

/* loaded from: input_file:tech/guyi/component/message/stream/rabbitmq/RabbitmqMessageStream.class */
public class RabbitmqMessageStream implements MessageStream {
    private Channel channel;
    private Connection connection;
    private Consumer<Message> receiver;

    @Resource
    private RabbitmqConfiguration configuration;

    private String replaceTopic(String str) {
        return str.replaceAll("/", ".").replaceAll("\\*\\*", "#");
    }

    @NonNull
    public String getName() {
        return "rabbitmq";
    }

    public void close() {
        this.channel.close();
        this.connection.close();
    }

    public void register(final String str) {
        this.channel.queueBind(this.configuration.getQueue(), this.configuration.getExchange(), replaceTopic(str));
        this.channel.basicConsume(this.configuration.getQueue(), true, new DefaultConsumer(this.channel) { // from class: tech.guyi.component.message.stream.rabbitmq.RabbitmqMessageStream.1
            public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                RabbitmqMessageStream.this.receiver.accept(new Message(str, bArr));
            }
        });
    }

    public void unregister(String str) {
        this.channel.queueUnbind(this.configuration.getQueue(), this.configuration.getExchange(), replaceTopic(str));
    }

    public void open(Consumer<Message> consumer) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername(this.configuration.getUsername());
        connectionFactory.setPassword(this.configuration.getPassword());
        connectionFactory.setVirtualHost(this.configuration.getVirtualHost());
        connectionFactory.setHost(this.configuration.getHost());
        connectionFactory.setPort(this.configuration.getPort());
        this.connection = connectionFactory.newConnection();
        this.channel = this.connection.createChannel();
        this.channel.queueDeclare(this.configuration.getQueue(), false, false, false, (Map) null);
    }

    public void publish(Message message) {
        this.channel.basicPublish(this.configuration.getExchange(), replaceTopic(message.getTopic()), (AMQP.BasicProperties) null, message.getBytes());
    }
}
