package io.zbus.mq;

import io.zbus.kit.logging.Logger;
import io.zbus.kit.logging.LoggerFactory;
import io.zbus.mq.AbstractQueue;
import io.zbus.mq.Protocol;
import io.zbus.mq.disk.DiskMessage;
import io.zbus.mq.disk.Index;
import io.zbus.mq.disk.QueueReader;
import io.zbus.mq.disk.QueueWriter;
import io.zbus.transport.Session;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;

/* loaded from: input_file:io/zbus/mq/DiskQueue.class */
public class DiskQueue extends AbstractQueue {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DiskQueue.class);
    protected final Index index;
    protected final QueueWriter writer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zbus/mq/DiskQueue$DiskConsumeGroup.class */
    public class DiskConsumeGroup extends AbstractQueue.AbstractConsumeGroup {
        public final QueueReader reader;

        public DiskConsumeGroup(Index index, String str) throws IOException {
            super(str);
            this.reader = new QueueReader(index, this.groupName);
        }

        public DiskConsumeGroup(QueueReader queueReader, String str) throws IOException {
            super(str);
            this.reader = new QueueReader(queueReader, str);
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup
        public boolean isEnd() {
            try {
                return this.reader.isEOF();
            } catch (IOException e) {
                return true;
            }
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup
        public Message read() throws IOException {
            DiskMessage read = this.reader.read();
            if (read == null) {
                return null;
            }
            Message parse = Message.parse(read.body);
            if (parse == null) {
                DiskQueue.log.warn("data read from queue can not be serialized back to Message type");
            } else {
                parse.setOffset(read.offset);
            }
            return parse;
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.reader.close();
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup
        public void delete() throws IOException {
            this.reader.delete();
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup
        public Integer getMask() {
            return Integer.valueOf(this.reader.getMask());
        }

        @Override // io.zbus.mq.AbstractQueue.AbstractConsumeGroup
        public Protocol.ConsumeGroupInfo getConsumeGroupInfo() {
            Protocol.ConsumeGroupInfo consumeGroupInfo = new Protocol.ConsumeGroupInfo();
            consumeGroupInfo.topicName = this.reader.getIndexName();
            consumeGroupInfo.filter = this.reader.getFilter();
            consumeGroupInfo.creator = this.reader.getCreator();
            consumeGroupInfo.mask = this.reader.getMask();
            consumeGroupInfo.createdTime = this.reader.getCreatedTime();
            consumeGroupInfo.lastUpdatedTime = this.reader.getUpdatedTime();
            consumeGroupInfo.consumerCount = this.pullSessions.size();
            consumeGroupInfo.messageCount = this.reader.getMessageCount();
            consumeGroupInfo.groupName = this.groupName;
            consumeGroupInfo.consumerList = new ArrayList();
            Iterator<Session> it = this.pullSessions.values().iterator();
            while (it.hasNext()) {
                consumeGroupInfo.consumerList.add(it.next().remoteAddress());
            }
            return consumeGroupInfo;
        }
    }

    public DiskQueue(File file) throws IOException {
        this.index = new Index(file);
        this.topic = this.index.getName();
        this.writer = new QueueWriter(this.index);
        loadConsumeGroups();
    }

    @Override // io.zbus.mq.AbstractQueue
    protected void loadConsumeGroups() throws IOException {
        File[] listFiles = this.index.getReaderDir().listFiles(new FileFilter() { // from class: io.zbus.mq.DiskQueue.1
            @Override // java.io.FileFilter
            public boolean accept(File file) {
                return Index.isReaderFile(file);
            }
        });
        if (listFiles == null || listFiles.length <= 0) {
            return;
        }
        for (File file : listFiles) {
            String name = file.getName();
            String substring = name.substring(0, name.lastIndexOf(46));
            this.consumeGroups.put(substring, new DiskConsumeGroup(this.index, substring));
        }
    }

    @Override // io.zbus.mq.MessageQueue
    public Protocol.ConsumeGroupInfo declareGroup(ConsumeGroup consumeGroup) throws Exception {
        DiskConsumeGroup diskConsumeGroup;
        String groupName = consumeGroup.getGroupName();
        if (groupName == null) {
            groupName = nextGroupName();
        }
        DiskConsumeGroup diskConsumeGroup2 = (DiskConsumeGroup) this.consumeGroups.get(groupName);
        if (diskConsumeGroup2 == null) {
            QueueReader queueReader = null;
            if (consumeGroup.getStartCopy() != null && (diskConsumeGroup = (DiskConsumeGroup) this.consumeGroups.get(consumeGroup.getStartCopy())) != null) {
                queueReader = diskConsumeGroup.reader;
            }
            if (queueReader == null) {
                queueReader = findLatestReader();
            }
            diskConsumeGroup2 = queueReader != null ? new DiskConsumeGroup(queueReader, groupName) : new DiskConsumeGroup(this.index, groupName);
            if (consumeGroup.getCreator() != null) {
                diskConsumeGroup2.reader.setCreator(consumeGroup.getCreator());
            }
            this.consumeGroups.put(groupName, diskConsumeGroup2);
            log.info("ConsumeGroup created: %s", diskConsumeGroup2);
        }
        diskConsumeGroup2.reader.setFilter(consumeGroup.getFilter());
        Integer mask = consumeGroup.getMask();
        if (mask != null) {
            diskConsumeGroup2.reader.setMask(mask.intValue());
        }
        if (consumeGroup.getStartOffset() != null) {
            if (!diskConsumeGroup2.reader.seek(consumeGroup.getStartOffset().longValue(), consumeGroup.getStartMsgId())) {
                throw new IllegalArgumentException(String.format("seek by offset unsuccessfull: (offset=%d, msgid=%s)", consumeGroup.getStartOffset(), consumeGroup.getStartMsgId()));
            }
        } else if (consumeGroup.getStartTime() != null && !diskConsumeGroup2.reader.seek(consumeGroup.getStartTime().longValue())) {
            throw new IllegalArgumentException(String.format("seek by time unsuccessfull: (time=%d)", consumeGroup.getStartTime()));
        }
        return diskConsumeGroup2.getConsumeGroupInfo();
    }

    private QueueReader findLatestReader() {
        if (this.consumeGroups.isEmpty()) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<AbstractQueue.AbstractConsumeGroup> it = this.consumeGroups.values().iterator();
        while (it.hasNext()) {
            arrayList.add((DiskConsumeGroup) it.next());
        }
        Collections.sort(arrayList, new Comparator<DiskConsumeGroup>() { // from class: io.zbus.mq.DiskQueue.2
            @Override // java.util.Comparator
            public int compare(DiskConsumeGroup diskConsumeGroup, DiskConsumeGroup diskConsumeGroup2) {
                return -diskConsumeGroup.reader.compareTo(diskConsumeGroup2.reader);
            }
        });
        return ((DiskConsumeGroup) arrayList.get(0)).reader;
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public void destroy() throws IOException {
        Iterator<AbstractQueue.AbstractConsumeGroup> it = this.consumeGroups.values().iterator();
        while (it.hasNext()) {
            it.next().delete();
        }
        this.writer.close();
        this.index.delete();
    }

    @Override // io.zbus.mq.MessageQueue
    public void produce(Message message) throws IOException {
        DiskMessage diskMessage = new DiskMessage();
        diskMessage.id = message.getId();
        diskMessage.tag = message.getTag();
        diskMessage.body = message.toBytes();
        this.writer.write(diskMessage);
        this.lastUpdatedTime = System.currentTimeMillis();
        dispatch();
    }

    @Override // io.zbus.mq.MessageQueue
    public Message consume(String str) throws IOException {
        DiskConsumeGroup diskConsumeGroup = (DiskConsumeGroup) this.consumeGroups.get(str);
        if (diskConsumeGroup == null) {
            throw new IllegalArgumentException("ConsumeGroup(" + str + ") not found");
        }
        return diskConsumeGroup.read();
    }

    @Override // io.zbus.mq.MessageQueue
    public String getCreator() {
        return this.index.getCreator();
    }

    @Override // io.zbus.mq.MessageQueue
    public void setCreator(String str) {
        this.index.setCreator(str);
    }

    @Override // io.zbus.mq.MessageQueue
    public int getMask() {
        return this.index.getMask();
    }

    @Override // io.zbus.mq.MessageQueue
    public void setMask(int i) {
        this.index.setMask(i);
    }

    @Override // io.zbus.mq.MessageQueue
    public long createdTime() {
        return this.index.getCreatedTime();
    }

    @Override // io.zbus.mq.MessageQueue
    public long messageDepth() {
        return this.index.getMessageCount();
    }

    @Override // io.zbus.mq.AbstractQueue
    public /* bridge */ /* synthetic */ String toString() {
        return super.toString();
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ Protocol.TopicInfo topicInfo() {
        return super.topicInfo();
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ void cleanSession(Session session) {
        super.cleanSession(session);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ int sessionCount(String str) {
        return super.sessionCount(str);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ void consume(Message message, Session session) throws IOException {
        super.consume(message, session);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ void unconsume(Message message, Session session) throws IOException {
        super.unconsume(message, session);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ void removeGroup(String str) throws IOException {
        super.removeGroup(str);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ Protocol.ConsumeGroupInfo groupInfo(String str) {
        return super.groupInfo(str);
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ long updatedTime() {
        return super.updatedTime();
    }

    @Override // io.zbus.mq.AbstractQueue, io.zbus.mq.MessageQueue
    public /* bridge */ /* synthetic */ String topic() {
        return super.topic();
    }
}
