package io.zbus.mq;

import io.zbus.kit.logging.Logger;
import io.zbus.kit.logging.LoggerFactory;
import io.zbus.mq.Broker;
import io.zbus.transport.ServerAddress;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/zbus/mq/Consumer.class */
public class Consumer extends MqAdmin implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Consumer.class);
    private Broker.ServerSelector consumeServerSelector;
    protected String topic;
    protected ConsumeGroup consumeGroup;
    protected Integer consumeWindow;
    protected int consumeTimeout;
    private ExecutorService consumeRunner;
    private MessageHandler messageHandler;
    private int connectionCount;
    private int consumeRunnerPoolSize;
    private int maxInFlightMessage;
    private boolean started;
    private Map<ServerAddress, ConsumeThreadGroup> consumeThreadGroupMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zbus/mq/Consumer$ConsumeThreadGroup.class */
    public class ConsumeThreadGroup implements Closeable {
        private ConsumeThread[] threads;

        ConsumeThreadGroup(MqClientPool mqClientPool) {
            this.threads = new ConsumeThread[Consumer.this.connectionCount];
            for (int i = 0; i < Consumer.this.connectionCount; i++) {
                ConsumeThread consumeThread = new ConsumeThread(mqClientPool.createClient());
                this.threads[i] = consumeThread;
                consumeThread.setTopic(Consumer.this.topic);
                consumeThread.setConsumeGroup(Consumer.this.consumeGroup);
                consumeThread.setToken(Consumer.this.token);
                consumeThread.setConsumeRunner(Consumer.this.consumeRunner);
                consumeThread.setConsumeTimeout(Consumer.this.consumeTimeout);
                consumeThread.setMessageHandler(Consumer.this.messageHandler);
            }
        }

        public void start(boolean z) {
            for (ConsumeThread consumeThread : this.threads) {
                consumeThread.start(z);
            }
        }

        public void pause() {
            for (ConsumeThread consumeThread : this.threads) {
                consumeThread.pause();
            }
        }

        public void resume() {
            for (ConsumeThread consumeThread : this.threads) {
                consumeThread.resume();
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            for (ConsumeThread consumeThread : this.threads) {
                consumeThread.close();
                consumeThread.getClient().close();
            }
        }
    }

    /* loaded from: input_file:io/zbus/mq/Consumer$DefaultConsumeServerSelector.class */
    public static class DefaultConsumeServerSelector implements Broker.ServerSelector {
        @Override // io.zbus.mq.Broker.ServerSelector
        public ServerAddress[] select(BrokerRouteTable brokerRouteTable, Message message) {
            return (ServerAddress[]) brokerRouteTable.serverTable().keySet().toArray(new ServerAddress[0]);
        }
    }

    public Consumer(ConsumerConfig consumerConfig) {
        super(consumerConfig);
        this.consumeThreadGroupMap = new ConcurrentHashMap();
        this.topic = consumerConfig.getTopic();
        this.consumeGroup = consumerConfig.getConsumeGroup();
        if (this.consumeGroup == null) {
            this.consumeGroup = new ConsumeGroup();
            this.consumeGroup.setGroupName(this.topic);
        }
        if (this.consumeGroup.getMask() == null) {
            this.consumeGroup.setMask(consumerConfig.getTopicMask());
        }
        this.consumeWindow = consumerConfig.getConsumeWindow();
        this.consumeTimeout = consumerConfig.getConsumeTimeout();
        this.messageHandler = consumerConfig.getMessageHandler();
        this.consumeRunnerPoolSize = consumerConfig.getConsumeRunnerPoolSize();
        this.connectionCount = consumerConfig.getConnectionCount();
        this.maxInFlightMessage = consumerConfig.getMaxInFlightMessage();
        this.consumeServerSelector = consumerConfig.getConsumeServerSelector();
        if (this.consumeServerSelector == null) {
            this.consumeServerSelector = new DefaultConsumeServerSelector();
        }
    }

    public synchronized void start() throws IOException {
        start(false);
    }

    public synchronized void start(final boolean z) throws IOException {
        if (this.started) {
            return;
        }
        if (this.messageHandler == null) {
            throw new IllegalArgumentException("ConsumeHandler and MessageProcessor are both null");
        }
        int i = this.consumeRunnerPoolSize;
        this.consumeRunner = new ThreadPoolExecutor(i, i, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue(this.maxInFlightMessage), new ThreadPoolExecutor.CallerRunsPolicy());
        Message message = new Message();
        message.setTopic(this.topic);
        for (MqClientPool mqClientPool : this.broker.selectClient(this.consumeServerSelector, message)) {
            startConsumeThreadGroup(mqClientPool, z);
        }
        this.broker.addServerNotifyListener(new Broker.ServerNotifyListener() { // from class: io.zbus.mq.Consumer.1
            @Override // io.zbus.mq.Broker.ServerNotifyListener
            public void onServerLeave(ServerAddress serverAddress) {
                ConsumeThreadGroup consumeThreadGroup = (ConsumeThreadGroup) Consumer.this.consumeThreadGroupMap.remove(serverAddress);
                if (consumeThreadGroup != null) {
                    try {
                        Consumer.log.info("Server(" + serverAddress + ") left, clear consumeThreads connecting to it");
                        consumeThreadGroup.close();
                    } catch (IOException e) {
                        Consumer.log.error(e.getMessage(), e);
                    }
                }
            }

            @Override // io.zbus.mq.Broker.ServerNotifyListener
            public void onServerJoin(MqClientPool mqClientPool2) {
                Consumer.this.startConsumeThreadGroup(mqClientPool2, z);
            }
        });
        this.started = true;
    }

    public void pause() {
        Iterator<ConsumeThreadGroup> it = this.consumeThreadGroupMap.values().iterator();
        while (it.hasNext()) {
            it.next().pause();
        }
    }

    public void resume() {
        Iterator<ConsumeThreadGroup> it = this.consumeThreadGroupMap.values().iterator();
        while (it.hasNext()) {
            it.next().resume();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startConsumeThreadGroup(MqClientPool mqClientPool, boolean z) {
        if (this.consumeThreadGroupMap.containsKey(mqClientPool.serverAddress())) {
            return;
        }
        ConsumeThreadGroup consumeThreadGroup = new ConsumeThreadGroup(mqClientPool);
        this.consumeThreadGroupMap.put(mqClientPool.serverAddress(), consumeThreadGroup);
        consumeThreadGroup.start(z);
    }

    public void start(MessageHandler messageHandler) throws IOException {
        setMessageHandler(messageHandler);
        start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.consumeThreadGroupMap != null) {
            Iterator<ConsumeThreadGroup> it = this.consumeThreadGroupMap.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.consumeThreadGroupMap.clear();
            this.consumeThreadGroupMap = null;
        }
        if (this.consumeRunner != null) {
            this.consumeRunner.shutdown();
            this.consumeRunner = null;
        }
    }

    public void setMessageHandler(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    public Broker.ServerSelector getConsumeServerSelector() {
        return this.consumeServerSelector;
    }

    public void setConsumeServerSelector(Broker.ServerSelector serverSelector) {
        this.consumeServerSelector = serverSelector;
    }
}
