package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironment;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/stream/impl/StreamConsumer.class */
public class StreamConsumer implements Consumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumer.class);
    private final Runnable closingCallback;
    private final Runnable closingTrackingCallback;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final String name;
    private final String stream;
    private final StreamEnvironment environment;
    private volatile Client trackingClient;
    private volatile Status status;
    private final LongConsumer trackingCallback;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/StreamConsumer$Status.class */
    public enum Status {
        RUNNING,
        NOT_AVAILABLE,
        CLOSED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamConsumer(String str, OffsetSpecification offsetSpecification, MessageHandler messageHandler, String str2, StreamEnvironment streamEnvironment, StreamConsumerBuilder.TrackingConfiguration trackingConfiguration) {
        MessageHandler messageHandler2;
        try {
            this.name = str2;
            this.stream = str;
            this.environment = streamEnvironment;
            if (trackingConfiguration.enabled()) {
                StreamEnvironment.TrackingConsumerRegistration registerTrackingConsumer = streamEnvironment.registerTrackingConsumer(this, trackingConfiguration);
                this.closingTrackingCallback = registerTrackingConsumer.closingCallback();
                java.util.function.Consumer<MessageHandler.Context> postMessageProcessingCallback = registerTrackingConsumer.postMessageProcessingCallback();
                messageHandler2 = postMessageProcessingCallback == null ? messageHandler : (context, message) -> {
                    messageHandler.handle(context, message);
                    postMessageProcessingCallback.accept(context);
                };
                this.trackingCallback = registerTrackingConsumer.trackingCallback();
            } else {
                this.closingTrackingCallback = () -> {
                };
                this.trackingCallback = Utils.NO_OP_LONG_CONSUMER;
                messageHandler2 = messageHandler;
            }
            this.closingCallback = streamEnvironment.registerConsumer(this, str, offsetSpecification, this.name, messageHandler2);
            this.status = Status.RUNNING;
        } catch (RuntimeException e) {
            this.closed.set(true);
            throw e;
        }
    }

    @Override // com.rabbitmq.stream.Consumer
    public void store(long j) {
        this.trackingCallback.accept(j);
        if (canTrack()) {
            try {
                this.trackingClient.storeOffset(this.name, this.stream, j);
            } catch (Exception e) {
                LOGGER.debug("Error while trying to store offset: {}", e.getMessage());
            }
        }
    }

    private boolean canTrack() {
        return this.status == Status.RUNNING;
    }

    @Override // com.rabbitmq.stream.Consumer, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.environment.removeConsumer(this);
            closeFromEnvironment();
            LOGGER.debug("Closed consumer successfully");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeFromEnvironment() {
        LOGGER.debug("Calling consumer closing callback");
        this.closingCallback.run();
        LOGGER.debug("Calling tracking consumer closing callback (may be no-op)");
        this.closingTrackingCallback.run();
        this.closed.set(true);
        this.status = Status.CLOSED;
        LOGGER.debug("Closed consumer successfully");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAfterStreamDeletion() {
        if (this.closed.compareAndSet(false, true)) {
            this.environment.removeConsumer(this);
            this.status = Status.CLOSED;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isOpen() {
        return !this.closed.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setClient(Client client) {
        this.trackingClient = client;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void unavailable() {
        this.status = Status.NOT_AVAILABLE;
        this.trackingClient = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void running() {
        this.status = Status.RUNNING;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long lastStoredOffset() {
        if (!canTrack()) {
            return 0L;
        }
        try {
            return this.trackingClient.queryOffset(this.name, this.stream);
        } catch (Exception e) {
            return 0L;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String stream() {
        return this.stream;
    }
}
