package org.jtrim2.stream;

import java.util.Objects;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jtrim2.cancel.Cancellation;
import org.jtrim2.cancel.CancellationController;
import org.jtrim2.cancel.CancellationSource;
import org.jtrim2.cancel.CancellationToken;
import org.jtrim2.cancel.OperationCanceledException;
import org.jtrim2.collections.ReservablePollingQueues;
import org.jtrim2.collections.ReservedElementRef;
import org.jtrim2.concurrent.collections.TerminableQueue;
import org.jtrim2.concurrent.collections.TerminableQueues;
import org.jtrim2.concurrent.collections.TerminatedQueueException;
import org.jtrim2.executor.CancelableTask;
import org.jtrim2.executor.TaskExecutor;
import org.jtrim2.utils.ExceptionHelper;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/jtrim2/stream/ParallelSeqGroupProducer.class */
public final class ParallelSeqGroupProducer<T> implements SeqGroupProducer<T> {
    private static final Logger LOGGER = Logger.getLogger(ParallelSeqGroupProducer.class.getName());
    private final SeqGroupProducer<? extends T> srcSeqGroupProducer;
    private final Supplier<ExecutorRef> executorProvider;
    private final int consumerThreadCount;
    private final int totalQueueCapacity;

    /* loaded from: input_file:org/jtrim2/stream/ParallelSeqGroupProducer$UnsafeParallelSeqGroupProducer.class */
    private static final class UnsafeParallelSeqGroupProducer<T> implements SeqGroupProducer<T> {
        private final SeqGroupProducer<? extends T> srcSeqGroupProducer;
        private final CancellationController cancelController;
        private final int consumerThreadCount;
        private final TerminableQueue<T> queue;
        private final BackgroundWorkerManager queuePollerManager;
        private final ExceptionCollector consumerFailureRef;
        private volatile Throwable producerFailure;
        private volatile boolean producerFinishedNormally;

        public UnsafeParallelSeqGroupProducer(SeqGroupProducer<? extends T> seqGroupProducer, CancellationController cancellationController, TaskExecutor taskExecutor, int i, int i2) {
            this.srcSeqGroupProducer = (SeqGroupProducer) Objects.requireNonNull(seqGroupProducer, "srcSeqGroupProducer");
            this.cancelController = (CancellationController) Objects.requireNonNull(cancellationController, "cancelController");
            this.consumerThreadCount = i;
            this.queue = TerminableQueues.withWrappedQueue(ReservablePollingQueues.createFifoQueue(i2));
            TerminableQueue<T> terminableQueue = this.queue;
            terminableQueue.getClass();
            this.queuePollerManager = new BackgroundWorkerManager(taskExecutor, terminableQueue::shutdown, this::setConsumerFailure);
            this.consumerFailureRef = new ExceptionCollector();
            this.producerFailure = null;
            this.producerFinishedNormally = false;
        }

        private void setConsumerFailure(Throwable th) {
            try {
                this.consumerFailureRef.setFirstFailure(th);
                this.queue.shutdown();
                this.cancelController.cancel();
            } catch (Throwable th2) {
                ParallelSeqGroupProducer.LOGGER.log(Level.SEVERE, "Failed to shutdown consumers.", th2);
            }
        }

        private void pollLoop(CancellationToken cancellationToken, ElementConsumer<? super T> elementConsumer) throws Exception {
            while (true) {
                try {
                    ReservedElementRef takeButKeepReserved = this.queue.takeButKeepReserved(cancellationToken);
                    try {
                        elementConsumer.processElement((Object) takeButKeepReserved.element());
                        takeButKeepReserved.release();
                    } catch (Throwable th) {
                        takeButKeepReserved.release();
                        throw th;
                    }
                } catch (TerminatedQueueException e) {
                    ExceptionHelper.rethrowCheckedIfNotNull(this.producerFailure, Exception.class);
                    ExceptionHelper.rethrowCheckedIfNotNull(this.consumerFailureRef.getLatest(), Exception.class);
                    if (!this.producerFinishedNormally) {
                        throw new AssertionError("Internal-error: Unfinished producer.");
                    }
                    return;
                }
            }
        }

        public void consume(CancellationToken cancellationToken) throws Exception {
            this.srcSeqGroupProducer.transferAllSimple(cancellationToken, obj -> {
                try {
                    this.queue.put(cancellationToken, obj);
                } catch (OperationCanceledException e) {
                    ExceptionHelper.rethrowCheckedIfNotNull(this.consumerFailureRef.getLatest(), Exception.class);
                    throw e;
                } catch (TerminatedQueueException e2) {
                    ExceptionHelper.rethrowCheckedIfNotNull(this.consumerFailureRef.getLatest(), Exception.class);
                    throw new Exception("Consumer did not pull elements.");
                }
            });
        }

        @Override // org.jtrim2.stream.SeqGroupProducer
        public void transferAllSimple(CancellationToken cancellationToken, ElementConsumer<? super T> elementConsumer) throws Exception {
            Objects.requireNonNull(cancellationToken, "cancelToken");
            Objects.requireNonNull(elementConsumer, "consumer");
            transferAllGeneric(cancellationToken, cancellationToken2 -> {
                pollLoop(cancellationToken2, elementConsumer);
            });
        }

        @Override // org.jtrim2.stream.SeqGroupProducer
        public void transferAll(CancellationToken cancellationToken, SeqConsumer<? super T> seqConsumer) throws Exception {
            Objects.requireNonNull(cancellationToken, "cancelToken");
            Objects.requireNonNull(seqConsumer, "seqConsumer");
            transferAllGeneric(cancellationToken, cancellationToken2 -> {
                seqConsumer.consumeAll(cancellationToken2, this::pollLoop);
            });
        }

        private void transferAllGeneric(CancellationToken cancellationToken, CancelableTask cancelableTask) throws Exception {
            Throwable th = null;
            try {
                Thread currentThread = Thread.currentThread();
                this.queuePollerManager.startWorkers(cancellationToken, this.consumerThreadCount, cancellationToken2 -> {
                    if (Thread.currentThread() == currentThread) {
                        setConsumerFailure(new IllegalStateException("Executor must not execute tasks synchronously to avoid dead-lock."));
                    } else {
                        cancelableTask.execute(cancellationToken2);
                    }
                });
                consume(cancellationToken);
                this.producerFinishedNormally = true;
            } catch (Throwable th2) {
                this.producerFailure = th2;
                th = th2;
            }
            try {
                this.queue.shutdown();
                this.queuePollerManager.waitForWorkers();
                this.queue.clear();
                th = this.consumerFailureRef.consumeLatestAndUpdate(th);
            } catch (Throwable th3) {
                th = ExceptionCollector.updateException(this.consumerFailureRef.consumeLatestAndUpdate(th), th3);
            }
            ExceptionHelper.rethrowCheckedIfNotNull(th, Exception.class);
        }
    }

    public ParallelSeqGroupProducer(Supplier<ExecutorRef> supplier, int i, int i2, SeqGroupProducer<? extends T> seqGroupProducer) {
        this.srcSeqGroupProducer = (SeqGroupProducer) Objects.requireNonNull(seqGroupProducer, "srcSeqGroupProducer");
        this.executorProvider = (Supplier) Objects.requireNonNull(supplier, "executorProvider");
        this.consumerThreadCount = ExceptionHelper.checkArgumentInRange(i, 1, Integer.MAX_VALUE, "backgroundThreadCount");
        this.totalQueueCapacity = i + ExceptionHelper.checkArgumentInRange(i2, 0, Integer.MAX_VALUE, "extraQueueCapacity");
    }

    @Override // org.jtrim2.stream.SeqGroupProducer
    public void transferAll(CancellationToken cancellationToken, SeqConsumer<? super T> seqConsumer) throws Exception {
        Objects.requireNonNull(cancellationToken, "cancelToken");
        Objects.requireNonNull(seqConsumer, "seqConsumer");
        Throwable th = null;
        ExecutorRef executorRef = this.executorProvider.get();
        try {
            CancellationSource createChildCancellationSource = Cancellation.createChildCancellationSource(cancellationToken);
            new UnsafeParallelSeqGroupProducer(this.srcSeqGroupProducer, createChildCancellationSource.getController(), executorRef.getExecutor(), this.consumerThreadCount, this.totalQueueCapacity).transferAll(createChildCancellationSource.getToken(), seqConsumer);
        } catch (Throwable th2) {
            th = th2;
        }
        try {
            executorRef.finishUsage();
        } catch (Throwable th3) {
            th = ExceptionCollector.updateException(th, th3);
        }
        ExceptionHelper.rethrowCheckedIfNotNull(th, Exception.class);
    }
}
