package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.ConfirmationStatus;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.impl.MessageAccumulator;
import com.rabbitmq.stream.impl.StreamProducer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.ToLongFunction;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/stream/impl/SimpleMessageAccumulator.class */
public class SimpleMessageAccumulator implements MessageAccumulator {
    protected final BlockingQueue<MessageAccumulator.AccumulatedEntity> messages;
    protected final Clock clock;
    private final int capacity;
    private final Codec codec;
    private final int maxFrameSize;
    private final ToLongFunction<Message> publishSequenceFunction;

    /* loaded from: input_file:com/rabbitmq/stream/impl/SimpleMessageAccumulator$SimpleAccumulatedEntity.class */
    private static final class SimpleAccumulatedEntity implements MessageAccumulator.AccumulatedEntity {
        private final long time;
        private final long publishingId;
        private final Codec.EncodedMessage encodedMessage;
        private final StreamProducer.ConfirmationCallback confirmationCallback;

        private SimpleAccumulatedEntity(long j, long j2, Codec.EncodedMessage encodedMessage, StreamProducer.ConfirmationCallback confirmationCallback) {
            this.time = j;
            this.publishingId = j2;
            this.encodedMessage = encodedMessage;
            this.confirmationCallback = confirmationCallback;
        }

        @Override // com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity
        public long publishindId() {
            return this.publishingId;
        }

        @Override // com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity
        public Object encodedEntity() {
            return this.encodedMessage;
        }

        @Override // com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity
        public long time() {
            return this.time;
        }

        @Override // com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity
        public StreamProducer.ConfirmationCallback confirmationCallback() {
            return this.confirmationCallback;
        }
    }

    /* loaded from: input_file:com/rabbitmq/stream/impl/SimpleMessageAccumulator$SimpleConfirmationCallback.class */
    private static final class SimpleConfirmationCallback implements StreamProducer.ConfirmationCallback {
        private final Message message;
        private final ConfirmationHandler confirmationHandler;

        private SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) {
            this.message = message;
            this.confirmationHandler = confirmationHandler;
        }

        @Override // com.rabbitmq.stream.impl.StreamProducer.ConfirmationCallback
        public int handle(boolean z, short s) {
            this.confirmationHandler.handle(new ConfirmationStatus(this.message, z, s));
            return 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimpleMessageAccumulator(int i, Codec codec, int i2, ToLongFunction<Message> toLongFunction, Clock clock) {
        this.capacity = i;
        this.messages = new LinkedBlockingQueue(i);
        this.codec = codec;
        this.maxFrameSize = i2;
        this.publishSequenceFunction = toLongFunction;
        this.clock = clock;
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public boolean add(Message message, ConfirmationHandler confirmationHandler) {
        Codec.EncodedMessage encode = this.codec.encode(message);
        Client.checkMessageFitsInFrame(this.maxFrameSize, encode);
        try {
            if (this.messages.offer(new SimpleAccumulatedEntity(this.clock.time(), this.publishSequenceFunction.applyAsLong(message), encode, new SimpleConfirmationCallback(message, confirmationHandler)), 60L, TimeUnit.SECONDS)) {
                return this.messages.size() == this.capacity;
            }
            throw new StreamException("Could not accumulate outbound message");
        } catch (InterruptedException e) {
            throw new StreamException("Error while accumulating outbound message", e);
        }
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public MessageAccumulator.AccumulatedEntity get() {
        return this.messages.poll();
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public boolean isEmpty() {
        return this.messages.isEmpty();
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public int size() {
        return this.messages.size();
    }
}
