package io.zbus.mq;

import io.zbus.kit.logging.Logger;
import io.zbus.kit.logging.LoggerFactory;
import io.zbus.mq.Protocol;
import io.zbus.mq.server.ReplyKit;
import io.zbus.transport.Session;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;

/* compiled from: MessageQueue.java */
/* loaded from: input_file:io/zbus/mq/AbstractQueue.class */
abstract class AbstractQueue implements MessageQueue {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractQueue.class);
    protected String topic;
    protected Map<String, AbstractConsumeGroup> consumeGroups = new ConcurrentSkipListMap(String.CASE_INSENSITIVE_ORDER);
    protected long lastUpdatedTime = System.currentTimeMillis();
    protected long groupNumber = this.consumeGroups.size();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* compiled from: MessageQueue.java */
    /* loaded from: input_file:io/zbus/mq/AbstractQueue$AbstractConsumeGroup.class */
    public static abstract class AbstractConsumeGroup implements Closeable {
        public final String groupName;
        public final BlockingQueue<PullSession> pullQ = new LinkedBlockingQueue();
        public final Map<String, Session> pullSessions = new ConcurrentHashMap();

        public AbstractConsumeGroup(String str) throws IOException {
            this.groupName = str;
        }

        public void removeSession(Session session) {
            this.pullSessions.remove(session.id());
            Iterator it = this.pullQ.iterator();
            while (it.hasNext()) {
                if (((PullSession) it.next()).session == session) {
                    it.remove();
                    return;
                }
            }
        }

        public abstract Message read() throws IOException;

        public abstract boolean isEnd();

        public Integer getMask() {
            return 0;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        public void delete() throws IOException {
        }

        public abstract Protocol.ConsumeGroupInfo getConsumeGroupInfo();

        public String toString() {
            return getClass().getSimpleName() + "[" + this.groupName + "]";
        }
    }

    public AbstractQueue() {
    }

    public AbstractQueue(String str) {
        this.topic = str;
    }

    protected void loadConsumeGroups() throws IOException {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String nextGroupName() {
        if (!this.consumeGroups.containsKey(this.topic)) {
            return this.topic;
        }
        while (true) {
            String str = this.topic + this.groupNumber;
            if (!this.consumeGroups.containsKey(str)) {
                return str;
            }
            this.groupNumber++;
        }
    }

    @Override // io.zbus.mq.MessageQueue
    public void destroy() throws IOException {
    }

    @Override // io.zbus.mq.MessageQueue
    public String topic() {
        return this.topic;
    }

    @Override // io.zbus.mq.MessageQueue
    public long updatedTime() {
        return this.lastUpdatedTime;
    }

    @Override // io.zbus.mq.MessageQueue
    public Protocol.ConsumeGroupInfo groupInfo(String str) {
        AbstractConsumeGroup abstractConsumeGroup = this.consumeGroups.get(str);
        if (abstractConsumeGroup == null) {
            return null;
        }
        return abstractConsumeGroup.getConsumeGroupInfo();
    }

    @Override // io.zbus.mq.MessageQueue
    public void removeGroup(String str) throws IOException {
        AbstractConsumeGroup remove = this.consumeGroups.remove(str);
        if (remove == null) {
            throw new MqException("ConsumeGroup(" + str + ") Not Found");
        }
        remove.delete();
    }

    @Override // io.zbus.mq.MessageQueue
    public void unconsume(Message message, Session session) throws IOException {
        String consumeGroup = message.getConsumeGroup();
        if (consumeGroup == null) {
            consumeGroup = this.topic;
        }
        AbstractConsumeGroup abstractConsumeGroup = this.consumeGroups.get(consumeGroup);
        if (abstractConsumeGroup != null) {
            abstractConsumeGroup.removeSession(session);
        } else {
            message.setBody(consumeGroup + " not found");
            ReplyKit.reply404(message, session, "ConsumeGroup(" + consumeGroup + ") Not Found");
        }
    }

    @Override // io.zbus.mq.MessageQueue
    public void consume(Message message, Session session) throws IOException {
        String consumeGroup = message.getConsumeGroup();
        if (consumeGroup == null) {
            consumeGroup = this.topic;
        }
        AbstractConsumeGroup abstractConsumeGroup = this.consumeGroups.get(consumeGroup);
        if (abstractConsumeGroup == null) {
            message.setBody(consumeGroup + " not found");
            ReplyKit.reply404(message, session, "ConsumeGroup(" + consumeGroup + ") Not Found");
            return;
        }
        if (!abstractConsumeGroup.pullSessions.containsKey(session.id())) {
            if (abstractConsumeGroup.pullSessions.size() > 0 && (abstractConsumeGroup.getMask().intValue() & 16) != 0) {
                ReplyKit.reply401(message, session, String.format("ConsumeGroup(%s) exclusive, forbbiden", consumeGroup));
                return;
            }
            abstractConsumeGroup.pullSessions.put(session.id(), session);
        }
        for (PullSession pullSession : abstractConsumeGroup.pullQ) {
            if (pullSession.getSession() == session) {
                pullSession.setPullMessage(message);
                dispatch(abstractConsumeGroup);
                return;
            }
        }
        abstractConsumeGroup.pullQ.offer(new PullSession(session, message));
        dispatch(abstractConsumeGroup);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dispatch() throws IOException {
        Iterator<Map.Entry<String, AbstractConsumeGroup>> it = this.consumeGroups.entrySet().iterator();
        while (it.hasNext()) {
            dispatch(it.next().getValue());
        }
    }

    protected void dispatch(AbstractConsumeGroup abstractConsumeGroup) throws IOException {
        PullSession poll;
        while (abstractConsumeGroup.pullQ.peek() != null && !abstractConsumeGroup.isEnd() && (poll = abstractConsumeGroup.pullQ.poll()) != null) {
            if (poll.getSession().active()) {
                Message read = abstractConsumeGroup.read();
                if (read == null) {
                    abstractConsumeGroup.pullQ.offer(poll);
                    return;
                }
                this.lastUpdatedTime = System.currentTimeMillis();
                try {
                    Message pullMessage = poll.getPullMessage();
                    Message copyWithoutBody = Message.copyWithoutBody(read);
                    copyWithoutBody.removeHeader(Protocol.TOKEN);
                    copyWithoutBody.setOriginId(read.getId());
                    copyWithoutBody.setId(pullMessage.getId());
                    Integer status = copyWithoutBody.getStatus();
                    if (status == null) {
                        copyWithoutBody.setOriginMethod(copyWithoutBody.getMethod());
                        if (!"/".equals(copyWithoutBody.getUrl())) {
                            copyWithoutBody.setOriginUrl(copyWithoutBody.getUrl());
                        }
                    } else {
                        copyWithoutBody.setOriginStatus(status);
                    }
                    copyWithoutBody.setStatus(200);
                    poll.getSession().write(copyWithoutBody);
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            }
        }
    }

    @Override // io.zbus.mq.MessageQueue
    public int sessionCount(String str) {
        if (str == null) {
            str = this.topic;
        }
        AbstractConsumeGroup abstractConsumeGroup = this.consumeGroups.get(str);
        if (abstractConsumeGroup == null) {
            return 0;
        }
        return abstractConsumeGroup.pullQ.size();
    }

    @Override // io.zbus.mq.MessageQueue
    public void cleanSession(Session session) {
        if (session == null) {
            cleanInactiveSessions();
            return;
        }
        Iterator<Map.Entry<String, AbstractConsumeGroup>> it = this.consumeGroups.entrySet().iterator();
        while (it.hasNext()) {
            cleanSession(it.next().getValue(), session);
        }
    }

    private void cleanSession(AbstractConsumeGroup abstractConsumeGroup, Session session) {
        abstractConsumeGroup.pullSessions.remove(session.id());
        Iterator it = abstractConsumeGroup.pullQ.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            } else if (session == ((PullSession) it.next()).session) {
                it.remove();
                break;
            }
        }
        if ((abstractConsumeGroup.getMask().intValue() & 32) == 0 || abstractConsumeGroup.pullSessions.size() != 0) {
            return;
        }
        try {
            abstractConsumeGroup.delete();
        } catch (IOException e) {
            log.warn(e.getMessage());
        }
    }

    private void cleanInactiveSessions() {
        Iterator<Map.Entry<String, AbstractConsumeGroup>> it = this.consumeGroups.entrySet().iterator();
        while (it.hasNext()) {
            AbstractConsumeGroup value = it.next().getValue();
            Iterator it2 = value.pullQ.iterator();
            while (it2.hasNext()) {
                PullSession pullSession = (PullSession) it2.next();
                if (!pullSession.session.active()) {
                    value.pullSessions.remove(pullSession.session.id());
                    it2.remove();
                }
            }
            if ((value.getMask().intValue() & 32) != 0 && value.pullSessions.size() == 0) {
                it.remove();
                try {
                    value.delete();
                } catch (IOException e) {
                    log.warn(e.getMessage());
                }
            }
        }
    }

    @Override // io.zbus.mq.MessageQueue
    public Protocol.TopicInfo topicInfo() {
        Protocol.TopicInfo topicInfo = new Protocol.TopicInfo();
        topicInfo.topicName = this.topic;
        topicInfo.createdTime = createdTime();
        topicInfo.lastUpdatedTime = updatedTime();
        topicInfo.mask = getMask();
        topicInfo.messageDepth = messageDepth();
        topicInfo.consumerCount = 0;
        topicInfo.consumeGroupList = new ArrayList();
        Iterator<AbstractConsumeGroup> it = this.consumeGroups.values().iterator();
        while (it.hasNext()) {
            Protocol.ConsumeGroupInfo consumeGroupInfo = it.next().getConsumeGroupInfo();
            topicInfo.consumerCount += consumeGroupInfo.consumerCount;
            topicInfo.consumeGroupList.add(consumeGroupInfo);
        }
        return topicInfo;
    }

    public String toString() {
        return getClass().getSimpleName() + "[" + this.topic + "]";
    }
}
