package io.zbus.mq;

import io.zbus.mq.Broker;
import io.zbus.mq.Protocol;
import io.zbus.transport.ResultCallback;
import io.zbus.transport.ServerAddress;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:io/zbus/mq/Producer.class */
public class Producer extends MqAdmin {
    private Broker.ServerSelector produceServerSelector;

    /* loaded from: input_file:io/zbus/mq/Producer$DefaultProduceServerSelector.class */
    public class DefaultProduceServerSelector implements Broker.ServerSelector {
        private long lastUpdatedTime = 0;
        private Map<String, List<Protocol.TopicInfo>> cache = new ConcurrentHashMap();
        private int roundRobinIndex = 0;

        public DefaultProduceServerSelector() {
        }

        @Override // io.zbus.mq.Broker.ServerSelector
        public ServerAddress[] select(BrokerRouteTable brokerRouteTable, Message message) {
            if (brokerRouteTable.getLastUpdatedTime() > this.lastUpdatedTime) {
                this.cache.clear();
                this.lastUpdatedTime = brokerRouteTable.getLastUpdatedTime();
            }
            if (brokerRouteTable.serverTable().size() == 0) {
                return new ServerAddress[0];
            }
            String topic = message.getTopic();
            if (topic == null) {
                return new ServerAddress[0];
            }
            List<Protocol.TopicInfo> list = this.cache.get(topic);
            if (list != null && !list.isEmpty()) {
                this.roundRobinIndex++;
                this.roundRobinIndex %= list.size();
                return new ServerAddress[]{list.get(this.roundRobinIndex).serverAddress};
            }
            Map<ServerAddress, Protocol.TopicInfo> map = brokerRouteTable.topicTable().get(topic);
            if (map == null || map.size() == 0) {
                return new ServerAddress[0];
            }
            ArrayList arrayList = new ArrayList();
            Protocol.TopicInfo topicInfo = null;
            Iterator<Map.Entry<ServerAddress, Protocol.TopicInfo>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                Protocol.TopicInfo value = it.next().getValue();
                if (value.consumerCount > 0) {
                    arrayList.add(value);
                }
                if (topicInfo == null) {
                    topicInfo = value;
                } else if (topicInfo.consumerCount < value.consumerCount) {
                    topicInfo = value;
                } else if (topicInfo.consumerCount == value.consumerCount && topicInfo.messageDepth > value.messageDepth) {
                    topicInfo = value;
                }
            }
            if (arrayList.size() > 0) {
                this.cache.put(topic, arrayList);
            }
            return new ServerAddress[]{topicInfo.serverAddress};
        }
    }

    public Producer(ProducerConfig producerConfig) {
        super(producerConfig);
        this.produceServerSelector = producerConfig.getProduceServerSelector();
        if (this.produceServerSelector == null) {
            this.produceServerSelector = new DefaultProduceServerSelector();
        }
    }

    public Producer(Broker broker) {
        this(new ProducerConfig(broker));
    }

    public Message publish(Message message, int i) throws IOException, InterruptedException {
        MqClientPool[] selectClient = this.broker.selectClient(this.produceServerSelector, message);
        if (selectClient.length < 1) {
            throw new MqException("Missing MqClient for publishing message: " + message);
        }
        MqClientPool mqClientPool = selectClient[0];
        MqClient mqClient = null;
        try {
            mqClient = mqClientPool.borrowClient();
            Message produce = configClient(mqClient).produce(message, i);
            mqClientPool.returnClient(mqClient);
            return produce;
        } catch (Throwable th) {
            mqClientPool.returnClient(mqClient);
            throw th;
        }
    }

    public Message publish(Message message) throws IOException, InterruptedException {
        return publish(message, this.invokeTimeout);
    }

    public void publishAsync(Message message, ResultCallback<Message> resultCallback) throws IOException {
        MqClientPool[] selectClient = this.broker.selectClient(this.produceServerSelector, message);
        if (selectClient.length < 1) {
            throw new MqException("Missing MqClient for publishing message: " + message);
        }
        MqClientPool mqClientPool = selectClient[0];
        MqClient mqClient = null;
        try {
            mqClient = mqClientPool.borrowClient();
            configClient(mqClient).produceAsync(message, resultCallback);
            mqClientPool.returnClient(mqClient);
        } catch (Throwable th) {
            mqClientPool.returnClient(mqClient);
            throw th;
        }
    }

    public Broker.ServerSelector getProduceServerSelector() {
        return this.produceServerSelector;
    }

    public void setProduceServerSelector(Broker.ServerSelector serverSelector) {
        this.produceServerSelector = serverSelector;
    }
}
