package io.zbus.mq;

import io.zbus.kit.JsonKit;
import io.zbus.kit.logging.Logger;
import io.zbus.kit.logging.LoggerFactory;
import io.zbus.mq.Protocol;
import io.zbus.mq.server.MqServer;
import io.zbus.transport.Client;
import io.zbus.transport.EventLoop;
import io.zbus.transport.ResultCallback;
import io.zbus.transport.ServerAddress;
import io.zbus.transport.Session;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/zbus/mq/Broker.class */
public class Broker implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Broker.class);
    private BrokerRouteTable routeTable;
    private Map<ServerAddress, MqClientPool> poolTable;
    private Map<ServerAddress, MqClient> trackerSubscribers;
    private List<ServerNotifyListener> listeners;
    private EventLoop eventLoop;
    private int clientPoolSize;
    private int readyTimeout;
    private CountDownLatch ready;
    private boolean waitCheck;

    /* loaded from: input_file:io/zbus/mq/Broker$ServerNotifyListener.class */
    public interface ServerNotifyListener {
        void onServerJoin(MqClientPool mqClientPool);

        void onServerLeave(ServerAddress serverAddress);
    }

    /* loaded from: input_file:io/zbus/mq/Broker$ServerSelector.class */
    public interface ServerSelector {
        ServerAddress[] select(BrokerRouteTable brokerRouteTable, Message message);
    }

    public Broker() {
        this(new BrokerConfig());
    }

    public Broker(BrokerConfig brokerConfig) {
        this.routeTable = new BrokerRouteTable();
        this.poolTable = new ConcurrentHashMap();
        this.trackerSubscribers = new ConcurrentHashMap();
        this.listeners = new ArrayList();
        this.clientPoolSize = 32;
        this.readyTimeout = 3000;
        this.ready = new CountDownLatch(1);
        this.waitCheck = true;
        this.eventLoop = new EventLoop();
        this.clientPoolSize = brokerConfig.getClientPoolSize();
        Iterator<ServerAddress> it = brokerConfig.getTrackerList().iterator();
        while (it.hasNext()) {
            addTracker(it.next());
        }
    }

    public Broker(String str) {
        this.routeTable = new BrokerRouteTable();
        this.poolTable = new ConcurrentHashMap();
        this.trackerSubscribers = new ConcurrentHashMap();
        this.listeners = new ArrayList();
        this.clientPoolSize = 32;
        this.readyTimeout = 3000;
        this.ready = new CountDownLatch(1);
        this.waitCheck = true;
        this.eventLoop = new EventLoop();
        for (String str2 : str.split("[,; ]")) {
            String trim = str2.trim();
            if (!trim.isEmpty()) {
                addTracker(trim);
            }
        }
    }

    public Broker(ServerAddress serverAddress) {
        this.routeTable = new BrokerRouteTable();
        this.poolTable = new ConcurrentHashMap();
        this.trackerSubscribers = new ConcurrentHashMap();
        this.listeners = new ArrayList();
        this.clientPoolSize = 32;
        this.readyTimeout = 3000;
        this.ready = new CountDownLatch(1);
        this.waitCheck = true;
        this.eventLoop = new EventLoop();
        addTracker(serverAddress);
    }

    public Broker(MqServer mqServer) {
        this(mqServer, null);
    }

    public Broker(MqServer mqServer, String str) {
        this.routeTable = new BrokerRouteTable();
        this.poolTable = new ConcurrentHashMap();
        this.trackerSubscribers = new ConcurrentHashMap();
        this.listeners = new ArrayList();
        this.clientPoolSize = 32;
        this.readyTimeout = 3000;
        this.ready = new CountDownLatch(1);
        this.waitCheck = true;
        this.eventLoop = new EventLoop();
        addTracker(mqServer, str);
    }

    public void addTracker(MqServer mqServer) {
        addTracker(mqServer, null);
    }

    public void addTracker(MqServer mqServer, String str) {
        ServerAddress serverAddress = new ServerAddress();
        serverAddress.server = mqServer;
        serverAddress.token = str;
        addTracker(serverAddress);
    }

    public void addTracker(String str) {
        addTracker(new ServerAddress(str));
    }

    public void addTracker(ServerAddress serverAddress) {
        final ServerAddress m33clone = serverAddress.m33clone();
        if (this.trackerSubscribers.containsKey(m33clone)) {
            return;
        }
        final MqClient mqClient = new MqClient(m33clone, this.eventLoop);
        this.trackerSubscribers.put(m33clone, mqClient);
        mqClient.onDisconnected(new Client.DisconnectedHandler() { // from class: io.zbus.mq.Broker.1
            @Override // io.zbus.transport.Client.DisconnectedHandler
            public void onDisconnected() throws IOException {
                Broker.log.warn("Disconnected from tracker(%s)", m33clone);
                List<ServerAddress> removeTracker = Broker.this.routeTable.removeTracker(m33clone);
                if (!removeTracker.isEmpty()) {
                    Iterator<ServerAddress> it = removeTracker.iterator();
                    while (it.hasNext()) {
                        Broker.this.removeServer(it.next());
                    }
                }
                mqClient.ensureConnectedAsync();
            }
        });
        mqClient.onConnected(new Client.ConnectedHandler() { // from class: io.zbus.mq.Broker.2
            @Override // io.zbus.transport.Client.ConnectedHandler
            public void onConnected() throws IOException {
                Broker.log.info("Connected to tracker(%s)", m33clone);
                Message message = new Message();
                message.setCommand(Protocol.TRACK_SUB);
                message.setToken(m33clone.getToken());
                mqClient.invokeAsync(message, (ResultCallback<Message>) null);
            }
        });
        mqClient.onMessage(new io.zbus.transport.MessageHandler<Message>() { // from class: io.zbus.mq.Broker.3
            @Override // io.zbus.transport.MessageHandler
            public void handle(Message message, Session session) throws IOException {
                if (message.getStatus().intValue() != 200) {
                    Broker.log.error(message.getBodyString());
                    return;
                }
                if (!Protocol.TRACK_PUB.equals(message.getCommand())) {
                    Broker.log.error("Unknown message: " + message);
                }
                Protocol.TrackerInfo trackerInfo = (Protocol.TrackerInfo) JsonKit.parseObject(message.getBodyString(), Protocol.TrackerInfo.class);
                m33clone.setAddress(trackerInfo.serverAddress.address);
                if (m33clone.getServer() != null) {
                    String serverAddress2 = trackerInfo.serverAddress.toString();
                    trackerInfo.serverAddress = m33clone;
                    if (trackerInfo.serverTable.containsKey(serverAddress2)) {
                        Protocol.ServerInfo remove = trackerInfo.serverTable.remove(serverAddress2);
                        remove.serverAddress = m33clone;
                        trackerInfo.serverTable.put(m33clone.toString(), remove);
                        Iterator<Protocol.TopicInfo> it = remove.topicTable.values().iterator();
                        while (it.hasNext()) {
                            it.next().serverAddress = m33clone;
                        }
                    }
                }
                List<ServerAddress> updateTracker = Broker.this.routeTable.updateTracker(trackerInfo);
                Iterator<Protocol.ServerInfo> it2 = Broker.this.routeTable.serverTable().values().iterator();
                while (it2.hasNext()) {
                    Broker.this.addServer(it2.next(), m33clone);
                }
                if (!updateTracker.isEmpty()) {
                    Iterator<ServerAddress> it3 = updateTracker.iterator();
                    while (it3.hasNext()) {
                        Broker.this.removeServer(it3.next());
                    }
                }
                if (Broker.this.waitCheck) {
                    Broker.this.ready.countDown();
                }
            }
        });
        mqClient.ensureConnectedAsync();
    }

    private void addServer(ServerAddress serverAddress, ServerAddress serverAddress2) throws IOException {
        synchronized (this.poolTable) {
            if (this.poolTable.get(serverAddress) != null) {
                return;
            }
            try {
                if (serverAddress.isSslEnabled()) {
                    MqClient mqClient = new MqClient(serverAddress2, this.eventLoop);
                    String querySslCertificate = mqClient.querySslCertificate(serverAddress.getAddress());
                    mqClient.close();
                    if (querySslCertificate == null) {
                        throw new IllegalStateException("MqServer SSL enabled, but certificate not found");
                    }
                    serverAddress.setCertificate(querySslCertificate);
                }
                final MqClientPool mqClientPool = new MqClientPool(serverAddress, this.clientPoolSize, this.eventLoop);
                this.poolTable.put(serverAddress, mqClientPool);
                this.eventLoop.getGroup().submit(new Runnable() { // from class: io.zbus.mq.Broker.4
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            for (final ServerNotifyListener serverNotifyListener : Broker.this.listeners) {
                                Broker.this.eventLoop.getGroup().submit(new Runnable() { // from class: io.zbus.mq.Broker.4.1
                                    @Override // java.lang.Runnable
                                    public void run() {
                                        serverNotifyListener.onServerJoin(mqClientPool);
                                    }
                                });
                            }
                        } catch (Exception e) {
                            Broker.log.error(e.getMessage(), e);
                        }
                    }
                });
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addServer(Protocol.ServerInfo serverInfo, ServerAddress serverAddress) throws IOException {
        addServer(serverInfo.serverAddress, serverAddress);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeServer(final ServerAddress serverAddress) {
        synchronized (this.poolTable) {
            final MqClientPool remove = this.poolTable.remove(serverAddress);
            if (remove == null) {
                return;
            }
            this.eventLoop.getGroup().schedule(new Runnable() { // from class: io.zbus.mq.Broker.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        remove.close();
                    } catch (IOException e) {
                        Broker.log.error(e.getMessage(), e);
                    }
                }
            }, 1000L, TimeUnit.MILLISECONDS);
            for (final ServerNotifyListener serverNotifyListener : this.listeners) {
                this.eventLoop.getGroup().submit(new Runnable() { // from class: io.zbus.mq.Broker.6
                    @Override // java.lang.Runnable
                    public void run() {
                        serverNotifyListener.onServerLeave(serverAddress);
                    }
                });
            }
        }
    }

    public EventLoop getEventLoop() {
        return this.eventLoop;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<MqClient> it = this.trackerSubscribers.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.trackerSubscribers.clear();
        synchronized (this.poolTable) {
            Iterator<MqClientPool> it2 = this.poolTable.values().iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
            this.poolTable.clear();
        }
        this.eventLoop.close();
    }

    public MqClientPool[] selectClient(ServerSelector serverSelector, Message message) {
        checkReady();
        ServerAddress[] select = serverSelector.select(this.routeTable, message);
        if (select == null) {
            return (MqClientPool[]) this.poolTable.values().toArray(new MqClientPool[0]);
        }
        MqClientPool[] mqClientPoolArr = new MqClientPool[select.length];
        int i = 0;
        for (int i2 = 0; i2 < select.length; i2++) {
            mqClientPoolArr[i2] = this.poolTable.get(select[i2]);
            if (mqClientPoolArr[i2] != null) {
                i++;
            }
        }
        if (i == select.length) {
            return mqClientPoolArr;
        }
        MqClientPool[] mqClientPoolArr2 = new MqClientPool[i];
        int i3 = 0;
        for (int i4 = 0; i4 < mqClientPoolArr.length; i4++) {
            if (mqClientPoolArr[i4] != null) {
                int i5 = i3;
                i3++;
                mqClientPoolArr2[i5] = mqClientPoolArr[i4];
            }
        }
        return mqClientPoolArr2;
    }

    public void addServerNotifyListener(ServerNotifyListener serverNotifyListener) {
        this.listeners.add(serverNotifyListener);
    }

    public void removeServerNotifyListener(ServerNotifyListener serverNotifyListener) {
        this.listeners.remove(serverNotifyListener);
    }

    private void checkReady() {
        if (this.waitCheck) {
            try {
                this.ready.await(this.readyTimeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            this.waitCheck = false;
        }
    }
}
