package io.zbus.mq;

import io.zbus.kit.logging.Logger;
import io.zbus.kit.logging.LoggerFactory;
import io.zbus.mq.AbstractQueue;
import io.zbus.mq.Protocol;
import io.zbus.transport.Session;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/* loaded from: input_file:io/zbus/mq/MemoryQueue.class */
public class MemoryQueue extends AbstractQueue {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MemoryQueue.class);
    protected Queue<Message> queue;
    protected int capacity;
    protected int mask;
    protected String creator;
    protected long createdTime;

    /* loaded from: input_file:io/zbus/mq/MemoryQueue$MemoryConsumeGroup.class */
    class MemoryConsumeGroup extends AbstractQueue.AbstractConsumeGroup {
        private String filter;
        private Integer mask;
        private long createdTime;
        private long updatedTime;

        public MemoryConsumeGroup(String str) throws IOException {
            super(str);
            this.createdTime = System.currentTimeMillis();
            this.updatedTime = System.currentTimeMillis();
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup
        public void removeSession(Session session) {
            this.pullSessions.remove(session.id());
            Iterator it = this.pullQ.iterator();
            while (it.hasNext()) {
                if (((PullSession) it.next()).session == session) {
                    it.remove();
                    return;
                }
            }
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup
        public Message read() throws IOException {
            return MemoryQueue.this.queue.poll();
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup
        public boolean isEnd() {
            return MemoryQueue.this.queue.size() == 0;
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup
        public Protocol.ConsumeGroupInfo getConsumeGroupInfo() {
            Protocol.ConsumeGroupInfo consumeGroupInfo = new Protocol.ConsumeGroupInfo();
            consumeGroupInfo.topicName = MemoryQueue.this.topic;
            consumeGroupInfo.filter = this.filter;
            consumeGroupInfo.creator = null;
            consumeGroupInfo.mask = this.mask == null ? 0 : this.mask.intValue();
            consumeGroupInfo.createdTime = this.createdTime;
            consumeGroupInfo.lastUpdatedTime = this.updatedTime;
            consumeGroupInfo.consumerCount = this.pullSessions.size();
            consumeGroupInfo.messageCount = MemoryQueue.this.queue.size();
            consumeGroupInfo.groupName = this.groupName;
            consumeGroupInfo.consumerList = new ArrayList();
            Iterator<Session> it = this.pullSessions.values().iterator();
            while (it.hasNext()) {
                consumeGroupInfo.consumerList.add(it.next().remoteAddress());
            }
            return consumeGroupInfo;
        }
    }

    public MemoryQueue(String str) {
        super(str);
        this.queue = new ConcurrentLinkedQueue();
        this.capacity = 1000;
        this.mask = 0;
        this.creator = "";
        this.createdTime = System.currentTimeMillis();
    }

    @Override // io.zbus.mq.MessageQueue
    public void produce(Message message) throws IOException {
        this.queue.offer(message);
        if (this.queue.size() > this.capacity) {
            this.queue.poll();
            log.warn("Memory queue full, message discarded");
        }
        this.lastUpdatedTime = System.currentTimeMillis();
        dispatch();
    }

    @Override // io.zbus.mq.MessageQueue
    public Message consume(String str) throws IOException {
        return this.queue.poll();
    }

    @Override // io.zbus.mq.MessageQueue
    public Protocol.ConsumeGroupInfo declareGroup(ConsumeGroup consumeGroup) throws Exception {
        String groupName = consumeGroup.getGroupName();
        if (groupName == null) {
            groupName = this.topic;
        }
        MemoryConsumeGroup memoryConsumeGroup = (MemoryConsumeGroup) this.consumeGroups.get(groupName);
        if (memoryConsumeGroup == null) {
            memoryConsumeGroup = new MemoryConsumeGroup(groupName);
            memoryConsumeGroup.filter = consumeGroup.getFilter();
            memoryConsumeGroup.mask = consumeGroup.getMask();
            this.consumeGroups.put(groupName, memoryConsumeGroup);
            log.info("ConsumeGroup created: %s", memoryConsumeGroup);
        }
        return memoryConsumeGroup.getConsumeGroupInfo();
    }

    @Override // io.zbus.mq.MessageQueue
    public long createdTime() {
        return this.createdTime;
    }

    @Override // io.zbus.mq.MessageQueue
    public long messageDepth() {
        return this.queue.size();
    }

    @Override // io.zbus.mq.MessageQueue
    public String getCreator() {
        return this.creator;
    }

    @Override // io.zbus.mq.MessageQueue
    public void setCreator(String str) {
        this.creator = str;
    }

    @Override // io.zbus.mq.MessageQueue
    public int getMask() {
        return this.mask;
    }

    @Override // io.zbus.mq.MessageQueue
    public void setMask(int i) {
        this.mask = i;
    }

    @Override // io.zbus.mq.AbstractQueue
    public /* bridge */ /* synthetic */ String toString() {
        return super.toString();
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ Protocol.TopicInfo topicInfo() {
        return super.topicInfo();
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ void cleanSession(Session session) {
        super.cleanSession(session);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ int sessionCount(String str) {
        return super.sessionCount(str);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ void consume(Message message, Session session) throws IOException {
        super.consume(message, session);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ void unconsume(Message message, Session session) throws IOException {
        super.unconsume(message, session);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ void removeGroup(String str) throws IOException {
        super.removeGroup(str);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ Protocol.ConsumeGroupInfo groupInfo(String str) {
        return super.groupInfo(str);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ long updatedTime() {
        return super.updatedTime();
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ String topic() {
        return super.topic();
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ void destroy() throws IOException {
        super.destroy();
    }
}
