package org.nutz.integration.zbus;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.nutz.integration.zbus.annotation.ZBusConsumer;
import org.nutz.integration.zbus.annotation.ZBusInvoker;
import org.nutz.integration.zbus.annotation.ZBusService;
import org.nutz.ioc.Ioc;
import org.nutz.lang.Streams;
import org.nutz.lang.Strings;
import org.nutz.lang.util.NutMap;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import org.nutz.resource.Scans;
import org.zbus.broker.Broker;
import org.zbus.mq.Consumer;
import org.zbus.mq.MqConfig;
import org.zbus.mq.Protocol;
import org.zbus.net.Client;
import org.zbus.net.http.Message;
import org.zbus.rpc.RpcProcessor;

/* loaded from: input_file:org/nutz/integration/zbus/ZBusFactory.class */
public class ZBusFactory {
    private static final Log log = Logs.get();
    protected Set<Consumer> consumers = new HashSet();
    protected Map<String, ZBusProducer> producers = new ConcurrentHashMap();
    protected Object lock = new Object();
    protected Broker broker;
    protected Ioc ioc;
    protected List<String> pkgs;

    public ZBusProducer getProducer(String str) {
        ZBusProducer zBusProducer = this.producers.get(str);
        if (zBusProducer == null) {
            synchronized (this.lock) {
                zBusProducer = this.producers.get(str);
                if (zBusProducer == null) {
                    zBusProducer = new ZBusProducer(this.broker, str, Protocol.MqMode.MQ, Protocol.MqMode.PubSub);
                    this.producers.put(str, zBusProducer);
                }
            }
        }
        return zBusProducer;
    }

    public void init() {
        if (this.pkgs != null) {
            init((String[]) this.pkgs.toArray(new String[0]));
        }
    }

    public void init(String... strArr) {
        if (strArr == null) {
            return;
        }
        for (String str : strArr) {
            if (str != null) {
                Iterator it = Scans.me().scanPackage(str).iterator();
                while (it.hasNext()) {
                    addConsumer((Class) it.next());
                }
            }
        }
    }

    public void close() throws Exception {
        Field declaredField = Consumer.class.getDeclaredField("client");
        Field declaredField2 = Client.class.getDeclaredField("heartbeator");
        declaredField.setAccessible(true);
        declaredField2.setAccessible(true);
        for (Consumer consumer : this.consumers) {
            try {
                consumer.close();
            } catch (Exception e) {
                Client client = (Client) declaredField.get(consumer);
                if (client != null) {
                    ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) declaredField2.get(client);
                    if (!scheduledExecutorService.isShutdown()) {
                        scheduledExecutorService.shutdown();
                    }
                }
            }
        }
    }

    public void addConsumer(Class<?> cls) {
        Consumer.ConsumerHandler consumerHandler;
        ZBusConsumer zBusConsumer = (ZBusConsumer) cls.getAnnotation(ZBusConsumer.class);
        if (zBusConsumer != null && zBusConsumer.enable()) {
            proxy(fromAnnotation(this.broker, zBusConsumer), (Consumer.ConsumerHandler) this.ioc.get(cls));
        }
        for (final Method method : cls.getMethods()) {
            ZBusConsumer zBusConsumer2 = (ZBusConsumer) method.getAnnotation(ZBusConsumer.class);
            if (zBusConsumer2 != null && zBusConsumer2.enable()) {
                MqConfig fromAnnotation = fromAnnotation(this.broker, zBusConsumer2);
                final Object obj = this.ioc.get(cls);
                switch (method.getParameterTypes().length) {
                    case 0:
                        consumerHandler = new Consumer.ConsumerHandler() { // from class: org.nutz.integration.zbus.ZBusFactory.1
                            public void handle(Message message, Consumer consumer) throws IOException {
                                try {
                                    method.invoke(obj, new Object[0]);
                                } catch (Exception e) {
                                    throw new IOException(e);
                                }
                            }
                        };
                        break;
                    case 1:
                        consumerHandler = new Consumer.ConsumerHandler() { // from class: org.nutz.integration.zbus.ZBusFactory.2
                            public void handle(Message message, Consumer consumer) throws IOException {
                                try {
                                    method.invoke(obj, message);
                                } catch (Exception e) {
                                    throw new IOException(e);
                                }
                            }
                        };
                        break;
                    case 2:
                        consumerHandler = new Consumer.ConsumerHandler() { // from class: org.nutz.integration.zbus.ZBusFactory.3
                            public void handle(Message message, Consumer consumer) throws IOException {
                                try {
                                    method.invoke(obj, message, consumer);
                                } catch (Exception e) {
                                    throw new IOException(e);
                                }
                            }
                        };
                        break;
                    default:
                        throw new RuntimeException("method[" + method + "] not good");
                }
                proxy(fromAnnotation, consumerHandler);
            }
        }
    }

    protected static MqConfig fromAnnotation(Broker broker, ZBusConsumer zBusConsumer) {
        MqConfig mqConfig = new MqConfig();
        mqConfig.setBroker(broker);
        mqConfig.setMode(zBusConsumer.mode());
        mqConfig.setMq(zBusConsumer.mq());
        mqConfig.setTopic(zBusConsumer.topic());
        mqConfig.setVerbose(zBusConsumer.verbose());
        return mqConfig;
    }

    protected void proxy(MqConfig mqConfig, Consumer.ConsumerHandler consumerHandler) {
        Consumer consumer = new Consumer(mqConfig);
        try {
            if (mqConfig.getMode() == Protocol.MqMode.MQ.intValue()) {
                consumer.createMQ();
            }
            consumer.start(consumerHandler);
            this.consumers.add(consumer);
        } catch (Exception e) {
            Streams.safeClose(consumer);
            throw new RuntimeException("create Consumer fail obj=" + consumerHandler.getClass().getName(), e);
        }
    }

    public static void buildServices(RpcProcessor rpcProcessor, Ioc ioc, String... strArr) {
        for (String str : strArr) {
            for (Class cls : Scans.me().scanPackage(str)) {
                if (((ZBusService) cls.getAnnotation(ZBusService.class)) != null) {
                    rpcProcessor.addModule(new Object[]{ioc.get(cls)});
                }
            }
        }
    }

    public static void addInovker(Class<?> cls, Map<String, Map<String, Object>> map) {
        ZBusInvoker zBusInvoker = (ZBusInvoker) cls.getAnnotation(ZBusInvoker.class);
        if (zBusInvoker != null) {
            String value = zBusInvoker.value();
            if (Strings.isBlank(value)) {
                value = Strings.lowerFirst(cls.getSimpleName());
            }
            log.debugf("define zbus Invoker bean name=%s type=%s", new Object[]{value, cls.getName()});
            NutMap vVar = new NutMap().setv("factory", "$rpc#getService");
            vVar.setv("args", new String[]{cls.getName()});
            vVar.setv("type", cls.getName());
            map.put(value, vVar);
        }
    }

    public void mq(String str, Object obj) {
        getProducer(str).async(obj);
    }

    public void publish(String str, Object obj) {
        getProducer(str).async(obj);
    }
}
