package org.nutz.integration.zbus;

import io.zbus.mq.Broker;
import io.zbus.mq.Consumer;
import io.zbus.mq.ConsumerConfig;
import io.zbus.mq.Message;
import io.zbus.mq.MessageHandler;
import io.zbus.mq.MqClient;
import io.zbus.mq.ProducerConfig;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.nutz.integration.zbus.mq.ZBusConsumer;
import org.nutz.integration.zbus.mq.ZBusProducer;
import org.nutz.ioc.Ioc;
import org.nutz.ioc.impl.PropertiesProxy;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.lang.Streams;
import org.nutz.log.Log;
import org.nutz.log.Logs;

@IocBean(name = "zbus", create = "init", depose = "close")
/* loaded from: input_file:org/nutz/integration/zbus/ZBusFactory.class */
public class ZBusFactory implements Closeable {
    private static final Log log = Logs.get();
    protected Set<Consumer> consumers = new HashSet();
    protected Map<String, ZBusProducer> producers = new ConcurrentHashMap();
    protected Object lock = new Object();
    protected Broker broker;

    @Inject("refer:$ioc")
    protected Ioc ioc;

    @Inject
    protected PropertiesProxy conf;

    public ZBusProducer getProducer(String str) {
        ZBusProducer zBusProducer = this.producers.get(str);
        if (zBusProducer == null) {
            synchronized (this.lock) {
                zBusProducer = this.producers.get(str);
                if (zBusProducer == null) {
                    zBusProducer = new ZBusProducer(new ProducerConfig(this.broker), str);
                    this.producers.put(str, zBusProducer);
                }
            }
        }
        return zBusProducer;
    }

    public void init() {
        log.debug("zbus ...");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    public void addConsumer(Class<?> cls) {
        MessageHandler messageHandler;
        ZBusConsumer zBusConsumer = (ZBusConsumer) cls.getAnnotation(ZBusConsumer.class);
        if (zBusConsumer != null && zBusConsumer.enable()) {
            proxy(fromAnnotation(this.broker, zBusConsumer), (MessageHandler) this.ioc.get(cls));
        }
        for (final Method method : cls.getMethods()) {
            ZBusConsumer zBusConsumer2 = (ZBusConsumer) method.getAnnotation(ZBusConsumer.class);
            if (zBusConsumer2 != null && zBusConsumer2.enable()) {
                ConsumerConfig fromAnnotation = fromAnnotation(this.broker, zBusConsumer2);
                final Object obj = this.ioc.get(cls);
                switch (method.getParameterTypes().length) {
                    case 0:
                        messageHandler = new MessageHandler() { // from class: org.nutz.integration.zbus.ZBusFactory.1
                            public void handle(Message message, MqClient mqClient) throws IOException {
                                try {
                                    method.invoke(obj, new Object[0]);
                                } catch (Exception e) {
                                    throw new IOException(e);
                                }
                            }
                        };
                        break;
                    case 1:
                        messageHandler = new MessageHandler() { // from class: org.nutz.integration.zbus.ZBusFactory.2
                            public void handle(Message message, MqClient mqClient) throws IOException {
                                try {
                                    method.invoke(obj, message);
                                } catch (Exception e) {
                                    throw new IOException(e);
                                }
                            }
                        };
                        break;
                    case 2:
                        messageHandler = new MessageHandler() { // from class: org.nutz.integration.zbus.ZBusFactory.3
                            public void handle(Message message, MqClient mqClient) throws IOException {
                                try {
                                    method.invoke(obj, message, mqClient);
                                } catch (Exception e) {
                                    throw new IOException(e);
                                }
                            }
                        };
                        break;
                    default:
                        throw new RuntimeException("method[" + method + "] not good");
                }
                proxy(fromAnnotation, messageHandler);
            }
        }
    }

    protected static ConsumerConfig fromAnnotation(Broker broker, ZBusConsumer zBusConsumer) {
        ConsumerConfig consumerConfig = new ConsumerConfig(broker);
        consumerConfig.setTopic(zBusConsumer.topic());
        consumerConfig.setVerbose(zBusConsumer.verbose());
        return consumerConfig;
    }

    protected void proxy(ConsumerConfig consumerConfig, MessageHandler messageHandler) {
        Consumer consumer = new Consumer(consumerConfig);
        try {
            consumer.start(messageHandler);
            this.consumers.add(consumer);
        } catch (Exception e) {
            Streams.safeClose(consumer);
            throw new RuntimeException("create Consumer fail obj=" + messageHandler.getClass().getName(), e);
        }
    }

    public void publish(String str, Object obj) {
        getProducer(str).async(obj);
    }
}
